From 132ca9bdd8c2d32304ae641db01015dff131b844 Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Fri, 16 Jan 2026 17:35:56 -0800 Subject: [PATCH 01/13] feat(events): Enable metrics event emission in reportMetrics() This commit adds the core infrastructure for emitting metrics as events when reportMetrics() is called on the Iceberg REST catalog API. Changes: - Add REPORT_METRICS_REQUEST attribute to EventAttributes.java - Add BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS to PolarisEventType.java - Update reportMetrics() in IcebergRestCatalogEventServiceDelegator.java to emit BEFORE/AFTER events with catalog name, namespace, table, and request - Add ReportMetricsEventTest.java with unit tests verifying event emission This enables event listeners to receive metrics report events, allowing for use cases like audit logging and metrics persistence. Added tests and a Feature flag --- CHANGELOG.md | 1 + .../core/config/FeatureConfiguration.java | 19 ++ ...ebergRestCatalogEventServiceDelegator.java | 44 +++- .../service/events/EventAttributes.java | 5 + .../service/events/PolarisEventType.java | 4 + .../iceberg/ReportMetricsEventTest.java | 238 ++++++++++++++++++ .../apache/polaris/service/TestServices.java | 3 +- .../listeners/TestPolarisEventListener.java | 13 + 8 files changed, 323 insertions(+), 4 deletions(-) create mode 100644 runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/ReportMetricsEventTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 93e51deab4..63cd7b733c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti ### New Features +- Added `ENABLE_METRICS_EVENT_EMISSION` feature flag (default: false) to control the emission of `BEFORE_REPORT_METRICS` and `AFTER_REPORT_METRICS` events when the Iceberg REST catalog API's `reportMetrics()` method is called. When enabled, event listeners can receive metrics report data for use cases like audit logging and metrics persistence. Can be configured via `polaris.features."ENABLE_METRICS_EVENT_EMISSION"=true`. - Added `--no-sts` flag to CLI to support S3-compatible storage systems that do not have Security Token Service available. - Support credential vending for federated catalogs. `ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING` (default: true) was added to toggle this feature. - Enhanced catalog federation with SigV4 authentication support, additional authentication types for credential vending, and location-based access restrictions to block credential vending for remote tables outside allowed location lists. diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 1eb121c498..15ac0ca8d6 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -516,4 +516,23 @@ public static void enforceFeatureEnabledOrThrow( + "Helps prevent thundering herd when multiple requests fail simultaneously.") .defaultValue(0.5) .buildFeatureConfiguration(); + + /** + * Feature flag to control the emission of BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events + * when the Iceberg REST catalog API's reportMetrics() method is called. When disabled (default), + * the reportMetrics() method calls the delegate directly without emitting any events. When + * enabled, BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events are emitted, allowing event + * listeners to receive metrics report data for use cases like audit logging and metrics + * persistence. + */ + public static final FeatureConfiguration ENABLE_METRICS_EVENT_EMISSION = + PolarisConfiguration.builder() + .key("ENABLE_METRICS_EVENT_EMISSION") + .description( + "If set to true, emit BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events when " + + "the reportMetrics() API is called. This enables event listeners to receive " + + "metrics report data for use cases like audit logging and metrics persistence. " + + "Defaults to false to ensure backward compatibility.") + .defaultValue(false) + .buildFeatureConfiguration(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 3a8a35e9e2..1a09a513f7 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -41,6 +41,8 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.service.catalog.CatalogPrefixParser; import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService; @@ -64,6 +66,7 @@ public class IcebergRestCatalogEventServiceDelegator @Inject PolarisEventListener polarisEventListener; @Inject PolarisEventMetadataFactory eventMetadataFactory; @Inject CatalogPrefixParser prefixParser; + @Inject RealmConfig realmConfig; // Constructor for testing - allows manual dependency injection @VisibleForTesting @@ -71,11 +74,13 @@ public IcebergRestCatalogEventServiceDelegator( IcebergCatalogAdapter delegate, PolarisEventListener polarisEventListener, PolarisEventMetadataFactory eventMetadataFactory, - CatalogPrefixParser prefixParser) { + CatalogPrefixParser prefixParser, + RealmConfig realmConfig) { this.delegate = delegate; this.polarisEventListener = polarisEventListener; this.eventMetadataFactory = eventMetadataFactory; this.prefixParser = prefixParser; + this.realmConfig = realmConfig; } // Default constructor for CDI @@ -805,8 +810,41 @@ public Response reportMetrics( ReportMetricsRequest reportMetricsRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.reportMetrics( - prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); + // Check if metrics event emission is enabled + boolean metricsEventEmissionEnabled = + realmConfig.getConfig(FeatureConfiguration.ENABLE_METRICS_EVENT_EMISSION); + + // If metrics event emission is disabled, call delegate directly without emitting events + if (!metricsEventEmissionEnabled) { + return delegate.reportMetrics( + prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); + } + + // Emit events when feature is enabled + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onEvent( + new PolarisEvent( + PolarisEventType.BEFORE_REPORT_METRICS, + eventMetadataFactory.create(), + new AttributeMap() + .put(EventAttributes.CATALOG_NAME, catalogName) + .put(EventAttributes.NAMESPACE, namespaceObj) + .put(EventAttributes.TABLE_NAME, table) + .put(EventAttributes.REPORT_METRICS_REQUEST, reportMetricsRequest))); + Response resp = + delegate.reportMetrics( + prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); + polarisEventListener.onEvent( + new PolarisEvent( + PolarisEventType.AFTER_REPORT_METRICS, + eventMetadataFactory.create(), + new AttributeMap() + .put(EventAttributes.CATALOG_NAME, catalogName) + .put(EventAttributes.NAMESPACE, namespaceObj) + .put(EventAttributes.TABLE_NAME, table) + .put(EventAttributes.REPORT_METRICS_REQUEST, reportMetricsRequest))); + return resp; } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java b/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java index 7c62f84c31..cae6928de4 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java @@ -28,6 +28,7 @@ import org.apache.iceberg.rest.requests.CreateViewRequest; import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; +import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ConfigResponse; @@ -230,4 +231,8 @@ private EventAttributes() {} new AttributeKey<>("detach_policy_request", DetachPolicyRequest.class); public static final AttributeKey GET_APPLICABLE_POLICIES_RESPONSE = new AttributeKey<>("get_applicable_policies_response", GetApplicablePoliciesResponse.class); + + // Metrics reporting attributes + public static final AttributeKey REPORT_METRICS_REQUEST = + new AttributeKey<>("report_metrics_request", ReportMetricsRequest.class); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventType.java b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventType.java index 949b9cb24e..bb8345bcee 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventType.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventType.java @@ -200,4 +200,8 @@ public enum PolarisEventType { // Rate Limiting Events BEFORE_LIMIT_REQUEST_RATE, + + // Metrics Reporting Events + BEFORE_REPORT_METRICS, + AFTER_REPORT_METRICS, } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/ReportMetricsEventTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/ReportMetricsEventTest.java new file mode 100644 index 0000000000..7522347f28 --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/ReportMetricsEventTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.iceberg; + +import static org.apache.polaris.service.admin.PolarisAuthzTestBase.SCHEMA; +import static org.assertj.core.api.Assertions.assertThat; + +import jakarta.ws.rs.core.Response; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.metrics.ImmutableScanReport; +import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.metrics.ScanMetricsResult; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.ReportMetricsRequest; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.admin.model.CatalogProperties; +import org.apache.polaris.core.admin.model.CreateCatalogRequest; +import org.apache.polaris.core.admin.model.FileStorageConfigInfo; +import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.service.TestServices; +import org.apache.polaris.service.events.EventAttributes; +import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.PolarisEventType; +import org.apache.polaris.service.events.listeners.TestPolarisEventListener; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Unit tests for verifying that reportMetrics() emits BEFORE_REPORT_METRICS and + * AFTER_REPORT_METRICS events. + */ +public class ReportMetricsEventTest { + private static final String NAMESPACE = "test_ns"; + private static final String CATALOG = "test-catalog"; + private static final String TABLE = "test-table"; + + private String catalogLocation; + + @BeforeEach + public void setUp(@TempDir Path tempDir) { + catalogLocation = tempDir.toAbsolutePath().toUri().toString(); + if (catalogLocation.endsWith("/")) { + catalogLocation = catalogLocation.substring(0, catalogLocation.length() - 1); + } + } + + @Test + void testReportMetricsEmitsBeforeAndAfterEventsWhenEnabled() { + // Create test services with ENABLE_METRICS_EVENT_EMISSION enabled + TestServices testServices = createTestServicesWithMetricsEmissionEnabled(true); + createCatalogAndNamespace(testServices); + createTable(testServices, TABLE); + + // Create a ScanReport for testing + ImmutableScanReport scanReport = + ImmutableScanReport.builder() + .schemaId(0) + .tableName(NAMESPACE + "." + TABLE) + .snapshotId(100L) + .addProjectedFieldIds(1) + .addProjectedFieldNames("id") + .filter(Expressions.alwaysTrue()) + .scanMetrics(ScanMetricsResult.fromScanMetrics(ScanMetrics.noop())) + .build(); + + ReportMetricsRequest request = ReportMetricsRequest.of(scanReport); + + // Call reportMetrics + try (Response response = + testServices + .restApi() + .reportMetrics( + CATALOG, + NAMESPACE, + TABLE, + request, + testServices.realmContext(), + testServices.securityContext())) { + assertThat(response.getStatus()).isEqualTo(Response.Status.NO_CONTENT.getStatusCode()); + } + + // Verify that BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events were emitted + TestPolarisEventListener testEventListener = + (TestPolarisEventListener) testServices.polarisEventListener(); + + PolarisEvent beforeEvent = testEventListener.getLatest(PolarisEventType.BEFORE_REPORT_METRICS); + assertThat(beforeEvent).isNotNull(); + assertThat(beforeEvent.attributes().getRequired(EventAttributes.CATALOG_NAME)) + .isEqualTo(CATALOG); + assertThat(beforeEvent.attributes().getRequired(EventAttributes.NAMESPACE)) + .isEqualTo(Namespace.of(NAMESPACE)); + assertThat(beforeEvent.attributes().getRequired(EventAttributes.TABLE_NAME)).isEqualTo(TABLE); + assertThat(beforeEvent.attributes().getRequired(EventAttributes.REPORT_METRICS_REQUEST)) + .isNotNull(); + + PolarisEvent afterEvent = testEventListener.getLatest(PolarisEventType.AFTER_REPORT_METRICS); + assertThat(afterEvent).isNotNull(); + assertThat(afterEvent.attributes().getRequired(EventAttributes.CATALOG_NAME)) + .isEqualTo(CATALOG); + assertThat(afterEvent.attributes().getRequired(EventAttributes.NAMESPACE)) + .isEqualTo(Namespace.of(NAMESPACE)); + assertThat(afterEvent.attributes().getRequired(EventAttributes.TABLE_NAME)).isEqualTo(TABLE); + assertThat(afterEvent.attributes().getRequired(EventAttributes.REPORT_METRICS_REQUEST)) + .isNotNull(); + } + + @Test + void testReportMetricsDoesNotEmitEventsWhenDisabled() { + // Create test services with ENABLE_METRICS_EVENT_EMISSION disabled (default) + TestServices testServices = createTestServicesWithMetricsEmissionEnabled(false); + createCatalogAndNamespace(testServices); + createTable(testServices, TABLE); + + // Create a ScanReport for testing + ImmutableScanReport scanReport = + ImmutableScanReport.builder() + .schemaId(0) + .tableName(NAMESPACE + "." + TABLE) + .snapshotId(100L) + .addProjectedFieldIds(1) + .addProjectedFieldNames("id") + .filter(Expressions.alwaysTrue()) + .scanMetrics(ScanMetricsResult.fromScanMetrics(ScanMetrics.noop())) + .build(); + + ReportMetricsRequest request = ReportMetricsRequest.of(scanReport); + + // Call reportMetrics + try (Response response = + testServices + .restApi() + .reportMetrics( + CATALOG, + NAMESPACE, + TABLE, + request, + testServices.realmContext(), + testServices.securityContext())) { + assertThat(response.getStatus()).isEqualTo(Response.Status.NO_CONTENT.getStatusCode()); + } + + // Verify that BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events were NOT emitted + TestPolarisEventListener testEventListener = + (TestPolarisEventListener) testServices.polarisEventListener(); + + assertThat(testEventListener.hasEvent(PolarisEventType.BEFORE_REPORT_METRICS)).isFalse(); + assertThat(testEventListener.hasEvent(PolarisEventType.AFTER_REPORT_METRICS)).isFalse(); + } + + private TestServices createTestServicesWithMetricsEmissionEnabled(boolean enabled) { + Map config = + Map.of( + "ALLOW_INSECURE_STORAGE_TYPES", + "true", + "SUPPORTED_CATALOG_STORAGE_TYPES", + List.of("FILE"), + "ENABLE_METRICS_EVENT_EMISSION", + String.valueOf(enabled)); + return TestServices.builder().config(config).withEventDelegator(true).build(); + } + + private void createCatalogAndNamespace(TestServices services) { + CatalogProperties.Builder propertiesBuilder = + CatalogProperties.builder() + .setDefaultBaseLocation(String.format("%s/%s", catalogLocation, CATALOG)); + + StorageConfigInfo config = + FileStorageConfigInfo.builder() + .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE) + .build(); + Catalog catalogObject = + new Catalog( + Catalog.TypeEnum.INTERNAL, CATALOG, propertiesBuilder.build(), 0L, 0L, 1, config); + try (Response response = + services + .catalogsApi() + .createCatalog( + new CreateCatalogRequest(catalogObject), + services.realmContext(), + services.securityContext())) { + assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode()); + } + + CreateNamespaceRequest createNamespaceRequest = + CreateNamespaceRequest.builder().withNamespace(Namespace.of(NAMESPACE)).build(); + try (Response response = + services + .restApi() + .createNamespace( + CATALOG, + createNamespaceRequest, + services.realmContext(), + services.securityContext())) { + assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); + } + } + + private void createTable(TestServices services, String tableName) { + CreateTableRequest createTableRequest = + CreateTableRequest.builder() + .withName(tableName) + .withLocation( + String.format("%s/%s/%s/%s", catalogLocation, CATALOG, NAMESPACE, tableName)) + .withSchema(SCHEMA) + .build(); + services + .restApi() + .createTable( + CATALOG, + NAMESPACE, + createTableRequest, + null, + services.realmContext(), + services.securityContext()); + } +} diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 59af4b5a6c..3455c55c51 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -361,7 +361,8 @@ public String getAuthenticationScheme() { catalogService, polarisEventListener, eventMetadataFactory, - new DefaultCatalogPrefixParser()); + new DefaultCatalogPrefixParser(), + realmConfig); finalRestConfigurationService = new IcebergRestConfigurationEventServiceDelegator( catalogService, polarisEventListener, eventMetadataFactory); diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java index e23a7a9264..a0973750b5 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java @@ -43,4 +43,17 @@ public PolarisEvent getLatest(PolarisEventType type) { } return latest; } + + /** + * Returns the latest event of the specified type, or null if no such event has been recorded. + * This is useful for tests that need to verify no event was emitted. + */ + public PolarisEvent getLatestOrNull(PolarisEventType type) { + return latestEvents.get(type); + } + + /** Returns true if an event of the specified type has been recorded. */ + public boolean hasEvent(PolarisEventType type) { + return latestEvents.containsKey(type); + } } From 0bc647e2f975a2b12dab8a6e8b73c12406c197e3 Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Mon, 19 Jan 2026 09:31:22 -0800 Subject: [PATCH 02/13] refactor: Replace event-based metrics with MetricsProcessor interface This commit removes the event-based metrics reporting system and introduces a new MetricsProcessor interface with CDI support. This is the foundation for a simpler, more direct metrics processing architecture. Changes: - Remove ENABLE_METRICS_EVENT_EMISSION feature flag - Remove BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS event types - Remove REPORT_METRICS_REQUEST event attribute - Remove event emission from IcebergRestCatalogEventServiceDelegator.reportMetrics() - Remove ReportMetricsEventTest - Add MetricsProcessor interface for processing metrics reports - Add MetricsProcessingContext with rich contextual information (realm ID, principal, request ID, OpenTelemetry trace context) - Add MetricsProcessorConfiguration for type-safe configuration - Add CDI producer in ServiceProducers for MetricsProcessor The new MetricsProcessor interface provides: - Simpler, more direct processing (no events) - Rich context with realm, principal, request ID, OTel trace - CDI-based extensibility via @Identifier annotations - Type-safe configuration Implementations will be added in subsequent PRs. This commit provides the foundational interfaces and CDI infrastructure. Backward compatibility: The existing PolarisMetricsReporter interface and configuration remain unchanged and functional. --- .polaris-work-notes.md | 18 ++ .../core/config/FeatureConfiguration.java | 18 -- ...ebergRestCatalogEventServiceDelegator.java | 39 +-- .../service/config/ServiceProducers.java | 42 ++++ .../service/events/EventAttributes.java | 4 - .../service/events/PolarisEventType.java | 4 - .../reporting/MetricsProcessingContext.java | 67 +++++ .../service/reporting/MetricsProcessor.java | 78 ++++++ .../MetricsProcessorConfiguration.java | 102 ++++++++ .../iceberg/ReportMetricsEventTest.java | 238 ------------------ 10 files changed, 311 insertions(+), 299 deletions(-) create mode 100644 .polaris-work-notes.md create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java delete mode 100644 runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/ReportMetricsEventTest.java diff --git a/.polaris-work-notes.md b/.polaris-work-notes.md new file mode 100644 index 0000000000..cbd906acff --- /dev/null +++ b/.polaris-work-notes.md @@ -0,0 +1,18 @@ +# Polaris Work Notes + +## Active Branches + +### feat-3337-rest-catalog-metrics-table-merged +- **Remote**: `obelix74/polaris` (origin) +- **PR**: #3385 +- **Description**: Add metrics persistence with dual storage strategy for Iceberg table operations +- **Last Updated**: 2026-01-17 +- **Notes**: + - This branch is tracking `origin/feat-3337-rest-catalog-metrics-table-merged` + - Fixed failing tests by adding `ENABLE_METRICS_EVENT_EMISSION` feature flag + - To push: `git push origin feat-3337-rest-catalog-metrics-table-merged` + +## Git Remotes +- **origin**: https://github.com/obelix74/polaris.git (your fork) +- **upstream**: https://github.com/apache/polaris.git (Apache upstream) + diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 15ac0ca8d6..ef6ef692fb 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -517,22 +517,4 @@ public static void enforceFeatureEnabledOrThrow( .defaultValue(0.5) .buildFeatureConfiguration(); - /** - * Feature flag to control the emission of BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events - * when the Iceberg REST catalog API's reportMetrics() method is called. When disabled (default), - * the reportMetrics() method calls the delegate directly without emitting any events. When - * enabled, BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events are emitted, allowing event - * listeners to receive metrics report data for use cases like audit logging and metrics - * persistence. - */ - public static final FeatureConfiguration ENABLE_METRICS_EVENT_EMISSION = - PolarisConfiguration.builder() - .key("ENABLE_METRICS_EVENT_EMISSION") - .description( - "If set to true, emit BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events when " - + "the reportMetrics() API is called. This enables event listeners to receive " - + "metrics report data for use cases like audit logging and metrics persistence. " - + "Defaults to false to ensure backward compatibility.") - .defaultValue(false) - .buildFeatureConfiguration(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 1a09a513f7..34eacecc2b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -810,41 +810,10 @@ public Response reportMetrics( ReportMetricsRequest reportMetricsRequest, RealmContext realmContext, SecurityContext securityContext) { - // Check if metrics event emission is enabled - boolean metricsEventEmissionEnabled = - realmConfig.getConfig(FeatureConfiguration.ENABLE_METRICS_EVENT_EMISSION); - - // If metrics event emission is disabled, call delegate directly without emitting events - if (!metricsEventEmissionEnabled) { - return delegate.reportMetrics( - prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); - } - - // Emit events when feature is enabled - String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); - Namespace namespaceObj = decodeNamespace(namespace); - polarisEventListener.onEvent( - new PolarisEvent( - PolarisEventType.BEFORE_REPORT_METRICS, - eventMetadataFactory.create(), - new AttributeMap() - .put(EventAttributes.CATALOG_NAME, catalogName) - .put(EventAttributes.NAMESPACE, namespaceObj) - .put(EventAttributes.TABLE_NAME, table) - .put(EventAttributes.REPORT_METRICS_REQUEST, reportMetricsRequest))); - Response resp = - delegate.reportMetrics( - prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); - polarisEventListener.onEvent( - new PolarisEvent( - PolarisEventType.AFTER_REPORT_METRICS, - eventMetadataFactory.create(), - new AttributeMap() - .put(EventAttributes.CATALOG_NAME, catalogName) - .put(EventAttributes.NAMESPACE, namespaceObj) - .put(EventAttributes.TABLE_NAME, table) - .put(EventAttributes.REPORT_METRICS_REQUEST, reportMetricsRequest))); - return resp; + // Delegate directly to the underlying service + // Metrics processing will be handled by MetricsProcessor (to be implemented in later PR) + return delegate.reportMetrics( + prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java index 840b3fb80a..e8207c7f37 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java @@ -78,6 +78,8 @@ import org.apache.polaris.service.ratelimiter.RateLimiterFilterConfiguration; import org.apache.polaris.service.ratelimiter.TokenBucketConfiguration; import org.apache.polaris.service.ratelimiter.TokenBucketFactory; +import org.apache.polaris.service.reporting.MetricsProcessor; +import org.apache.polaris.service.reporting.MetricsProcessorConfiguration; import org.apache.polaris.service.reporting.MetricsReportingConfiguration; import org.apache.polaris.service.reporting.PolarisMetricsReporter; import org.apache.polaris.service.secrets.SecretsManagerConfiguration; @@ -435,4 +437,44 @@ public PolarisMetricsReporter metricsReporter( MetricsReportingConfiguration config, @Any Instance reporters) { return reporters.select(Identifier.Literal.of(config.type())).get(); } + + /** + * Produces the {@link MetricsProcessor} for metrics processing. + * + *

This producer supports the new configuration path: {@code polaris.metrics.processor.type} + * + *

The processor is selected based on the configured type using CDI {@link Identifier} + * annotations. Built-in processors include: + * + *

    + *
  • {@code noop} - Discards all metrics (default) + *
  • {@code logging} - Logs metrics to console + *
  • {@code persistence} - Persists to dedicated database tables + *
+ * + *

Custom processors can be implemented by creating a CDI bean with an {@code @Identifier} + * annotation. + * + *

Note: Implementations will be added in subsequent PRs. This producer provides the CDI + * infrastructure for the metrics processing system. + */ + @Produces + @ApplicationScoped + public MetricsProcessor metricsProcessor( + MetricsProcessorConfiguration config, @Any Instance processors) { + String type = config.type(); + LOGGER.info("Initializing metrics processor: type={}", type); + + try { + MetricsProcessor processor = processors.select(Identifier.Literal.of(type)).get(); + LOGGER.info("Successfully initialized metrics processor: {}", type); + return processor; + } catch (Exception e) { + LOGGER.error( + "Failed to instantiate metrics processor for type '{}': {}. Falling back to noop.", + type, + e.getMessage()); + return processors.select(Identifier.Literal.of("noop")).get(); + } + } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java b/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java index cae6928de4..7680a90be6 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java @@ -231,8 +231,4 @@ private EventAttributes() {} new AttributeKey<>("detach_policy_request", DetachPolicyRequest.class); public static final AttributeKey GET_APPLICABLE_POLICIES_RESPONSE = new AttributeKey<>("get_applicable_policies_response", GetApplicablePoliciesResponse.class); - - // Metrics reporting attributes - public static final AttributeKey REPORT_METRICS_REQUEST = - new AttributeKey<>("report_metrics_request", ReportMetricsRequest.class); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventType.java b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventType.java index bb8345bcee..949b9cb24e 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventType.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventType.java @@ -200,8 +200,4 @@ public enum PolarisEventType { // Rate Limiting Events BEFORE_LIMIT_REQUEST_RATE, - - // Metrics Reporting Events - BEFORE_REPORT_METRICS, - AFTER_REPORT_METRICS, } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java new file mode 100644 index 0000000000..629d43362f --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.reporting; + +import java.util.Optional; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.metrics.MetricsReport; +import org.immutables.value.Value; + +/** + * Context information for metrics processing, providing access to request metadata, security + * context, and tracing information. + * + *

This immutable context object contains all the information needed to process and persist + * Iceberg metrics reports, including catalog and table identifiers, the metrics report itself, and + * associated metadata like principal name, request ID, and OpenTelemetry trace context. + */ +@Value.Immutable +public interface MetricsProcessingContext { + + /** The catalog name where the metrics originated */ + String catalogName(); + + /** The table identifier */ + TableIdentifier tableIdentifier(); + + /** The Iceberg metrics report (ScanReport or CommitReport) */ + MetricsReport metricsReport(); + + /** The realm ID */ + String realmId(); + + /** The catalog ID (internal entity ID) */ + Optional catalogId(); + + /** The principal name who submitted the metrics */ + Optional principalName(); + + /** The request ID for correlation */ + Optional requestId(); + + /** OpenTelemetry trace ID */ + Optional otelTraceId(); + + /** OpenTelemetry span ID */ + Optional otelSpanId(); + + /** Timestamp when metrics were received (milliseconds since epoch) */ + long timestampMs(); +} + diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java new file mode 100644 index 0000000000..0071df16cc --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.reporting; + +/** + * Interface for processing Iceberg metrics reports in Polaris. + * + *

This interface provides a pluggable mechanism for handling metrics reports from Iceberg table + * operations. Implementations can persist metrics to various backends, forward them to external + * systems, or perform custom processing. + * + *

Processors are discovered via CDI using the {@link io.smallrye.common.annotation.Identifier} + * annotation. Custom processors can be implemented and registered by annotating them with {@code + * @ApplicationScoped} and {@code @Identifier("custom-name")}. + * + *

Available built-in processors: + * + *

    + *
  • {@code noop} - Discards all metrics (default) + *
  • {@code logging} - Logs metrics to console for debugging + *
  • {@code persistence} - Persists to dedicated metrics tables + *
+ * + *

Example configuration: + * + *

+ * polaris:
+ *   metrics:
+ *     processor:
+ *       type: persistence
+ * 
+ * + *

Custom implementations should be annotated with: + * + *

+ * {@literal @}ApplicationScoped
+ * {@literal @}Identifier("custom-processor")
+ * public class CustomMetricsProcessor implements MetricsProcessor {
+ *   {@literal @}Override
+ *   public void process(MetricsProcessingContext context) {
+ *     // implementation
+ *   }
+ * }
+ * 
+ * + * @see MetricsProcessingContext + * @see MetricsProcessorConfiguration + */ +public interface MetricsProcessor { + + /** + * Process a metrics report with full context information. + * + *

Implementations should handle exceptions gracefully and not throw exceptions that would + * disrupt the metrics reporting flow. Errors should be logged and metrics about processing + * failures should be emitted. + * + * @param context the complete context for metrics processing + */ + void process(MetricsProcessingContext context); +} + diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java new file mode 100644 index 0000000000..ba55985058 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.reporting; + +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import java.time.Duration; +import java.util.Optional; + +/** + * Configuration for metrics processing in Polaris. + * + *

This configuration controls how Iceberg metrics reports are processed and persisted. The + * processor type determines which implementation is used. + * + *

Example configuration: + * + *

+ * polaris:
+ *   metrics:
+ *     processor:
+ *       type: persistence
+ *       retention:
+ *         enabled: true
+ *         retention-period: P30D
+ *         cleanup-interval: PT6H
+ * 
+ */ +@ConfigMapping(prefix = "polaris.metrics.processor") +public interface MetricsProcessorConfiguration { + + /** + * The type of metrics processor to use. + * + *

Supported built-in values: + * + *

    + *
  • {@code noop} - No processing, discards all metrics (default) + *
  • {@code logging} - Log metrics to console for debugging + *
  • {@code persistence} - Persist to dedicated metrics tables + *
+ * + *

Custom processor types can be specified if a corresponding {@link MetricsProcessor} + * implementation is available with a matching {@link io.smallrye.common.annotation.Identifier}. + * + * @return the processor type identifier + */ + @WithDefault("noop") + String type(); + + /** + * Retention policy configuration for persisted metrics reports. + * + * @return the retention configuration + */ + Optional retention(); + + /** Retention policy configuration for metrics reports. */ + interface Retention { + + /** + * Whether automatic cleanup of old metrics reports is enabled. + * + * @return true if cleanup is enabled + */ + @WithDefault("false") + boolean enabled(); + + /** + * How long to retain metrics reports before they are eligible for deletion. + * + * @return the retention period (default: 30 days) + */ + @WithDefault("P30D") + Duration retentionPeriod(); + + /** + * How often to run the cleanup job. + * + * @return the cleanup interval (default: 6 hours) + */ + @WithDefault("PT6H") + Duration cleanupInterval(); + } +} + diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/ReportMetricsEventTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/ReportMetricsEventTest.java deleted file mode 100644 index 7522347f28..0000000000 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/ReportMetricsEventTest.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.catalog.iceberg; - -import static org.apache.polaris.service.admin.PolarisAuthzTestBase.SCHEMA; -import static org.assertj.core.api.Assertions.assertThat; - -import jakarta.ws.rs.core.Response; -import java.nio.file.Path; -import java.util.List; -import java.util.Map; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.metrics.ImmutableScanReport; -import org.apache.iceberg.metrics.ScanMetrics; -import org.apache.iceberg.metrics.ScanMetricsResult; -import org.apache.iceberg.rest.requests.CreateNamespaceRequest; -import org.apache.iceberg.rest.requests.CreateTableRequest; -import org.apache.iceberg.rest.requests.ReportMetricsRequest; -import org.apache.polaris.core.admin.model.Catalog; -import org.apache.polaris.core.admin.model.CatalogProperties; -import org.apache.polaris.core.admin.model.CreateCatalogRequest; -import org.apache.polaris.core.admin.model.FileStorageConfigInfo; -import org.apache.polaris.core.admin.model.StorageConfigInfo; -import org.apache.polaris.service.TestServices; -import org.apache.polaris.service.events.EventAttributes; -import org.apache.polaris.service.events.PolarisEvent; -import org.apache.polaris.service.events.PolarisEventType; -import org.apache.polaris.service.events.listeners.TestPolarisEventListener; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -/** - * Unit tests for verifying that reportMetrics() emits BEFORE_REPORT_METRICS and - * AFTER_REPORT_METRICS events. - */ -public class ReportMetricsEventTest { - private static final String NAMESPACE = "test_ns"; - private static final String CATALOG = "test-catalog"; - private static final String TABLE = "test-table"; - - private String catalogLocation; - - @BeforeEach - public void setUp(@TempDir Path tempDir) { - catalogLocation = tempDir.toAbsolutePath().toUri().toString(); - if (catalogLocation.endsWith("/")) { - catalogLocation = catalogLocation.substring(0, catalogLocation.length() - 1); - } - } - - @Test - void testReportMetricsEmitsBeforeAndAfterEventsWhenEnabled() { - // Create test services with ENABLE_METRICS_EVENT_EMISSION enabled - TestServices testServices = createTestServicesWithMetricsEmissionEnabled(true); - createCatalogAndNamespace(testServices); - createTable(testServices, TABLE); - - // Create a ScanReport for testing - ImmutableScanReport scanReport = - ImmutableScanReport.builder() - .schemaId(0) - .tableName(NAMESPACE + "." + TABLE) - .snapshotId(100L) - .addProjectedFieldIds(1) - .addProjectedFieldNames("id") - .filter(Expressions.alwaysTrue()) - .scanMetrics(ScanMetricsResult.fromScanMetrics(ScanMetrics.noop())) - .build(); - - ReportMetricsRequest request = ReportMetricsRequest.of(scanReport); - - // Call reportMetrics - try (Response response = - testServices - .restApi() - .reportMetrics( - CATALOG, - NAMESPACE, - TABLE, - request, - testServices.realmContext(), - testServices.securityContext())) { - assertThat(response.getStatus()).isEqualTo(Response.Status.NO_CONTENT.getStatusCode()); - } - - // Verify that BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events were emitted - TestPolarisEventListener testEventListener = - (TestPolarisEventListener) testServices.polarisEventListener(); - - PolarisEvent beforeEvent = testEventListener.getLatest(PolarisEventType.BEFORE_REPORT_METRICS); - assertThat(beforeEvent).isNotNull(); - assertThat(beforeEvent.attributes().getRequired(EventAttributes.CATALOG_NAME)) - .isEqualTo(CATALOG); - assertThat(beforeEvent.attributes().getRequired(EventAttributes.NAMESPACE)) - .isEqualTo(Namespace.of(NAMESPACE)); - assertThat(beforeEvent.attributes().getRequired(EventAttributes.TABLE_NAME)).isEqualTo(TABLE); - assertThat(beforeEvent.attributes().getRequired(EventAttributes.REPORT_METRICS_REQUEST)) - .isNotNull(); - - PolarisEvent afterEvent = testEventListener.getLatest(PolarisEventType.AFTER_REPORT_METRICS); - assertThat(afterEvent).isNotNull(); - assertThat(afterEvent.attributes().getRequired(EventAttributes.CATALOG_NAME)) - .isEqualTo(CATALOG); - assertThat(afterEvent.attributes().getRequired(EventAttributes.NAMESPACE)) - .isEqualTo(Namespace.of(NAMESPACE)); - assertThat(afterEvent.attributes().getRequired(EventAttributes.TABLE_NAME)).isEqualTo(TABLE); - assertThat(afterEvent.attributes().getRequired(EventAttributes.REPORT_METRICS_REQUEST)) - .isNotNull(); - } - - @Test - void testReportMetricsDoesNotEmitEventsWhenDisabled() { - // Create test services with ENABLE_METRICS_EVENT_EMISSION disabled (default) - TestServices testServices = createTestServicesWithMetricsEmissionEnabled(false); - createCatalogAndNamespace(testServices); - createTable(testServices, TABLE); - - // Create a ScanReport for testing - ImmutableScanReport scanReport = - ImmutableScanReport.builder() - .schemaId(0) - .tableName(NAMESPACE + "." + TABLE) - .snapshotId(100L) - .addProjectedFieldIds(1) - .addProjectedFieldNames("id") - .filter(Expressions.alwaysTrue()) - .scanMetrics(ScanMetricsResult.fromScanMetrics(ScanMetrics.noop())) - .build(); - - ReportMetricsRequest request = ReportMetricsRequest.of(scanReport); - - // Call reportMetrics - try (Response response = - testServices - .restApi() - .reportMetrics( - CATALOG, - NAMESPACE, - TABLE, - request, - testServices.realmContext(), - testServices.securityContext())) { - assertThat(response.getStatus()).isEqualTo(Response.Status.NO_CONTENT.getStatusCode()); - } - - // Verify that BEFORE_REPORT_METRICS and AFTER_REPORT_METRICS events were NOT emitted - TestPolarisEventListener testEventListener = - (TestPolarisEventListener) testServices.polarisEventListener(); - - assertThat(testEventListener.hasEvent(PolarisEventType.BEFORE_REPORT_METRICS)).isFalse(); - assertThat(testEventListener.hasEvent(PolarisEventType.AFTER_REPORT_METRICS)).isFalse(); - } - - private TestServices createTestServicesWithMetricsEmissionEnabled(boolean enabled) { - Map config = - Map.of( - "ALLOW_INSECURE_STORAGE_TYPES", - "true", - "SUPPORTED_CATALOG_STORAGE_TYPES", - List.of("FILE"), - "ENABLE_METRICS_EVENT_EMISSION", - String.valueOf(enabled)); - return TestServices.builder().config(config).withEventDelegator(true).build(); - } - - private void createCatalogAndNamespace(TestServices services) { - CatalogProperties.Builder propertiesBuilder = - CatalogProperties.builder() - .setDefaultBaseLocation(String.format("%s/%s", catalogLocation, CATALOG)); - - StorageConfigInfo config = - FileStorageConfigInfo.builder() - .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE) - .build(); - Catalog catalogObject = - new Catalog( - Catalog.TypeEnum.INTERNAL, CATALOG, propertiesBuilder.build(), 0L, 0L, 1, config); - try (Response response = - services - .catalogsApi() - .createCatalog( - new CreateCatalogRequest(catalogObject), - services.realmContext(), - services.securityContext())) { - assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode()); - } - - CreateNamespaceRequest createNamespaceRequest = - CreateNamespaceRequest.builder().withNamespace(Namespace.of(NAMESPACE)).build(); - try (Response response = - services - .restApi() - .createNamespace( - CATALOG, - createNamespaceRequest, - services.realmContext(), - services.securityContext())) { - assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); - } - } - - private void createTable(TestServices services, String tableName) { - CreateTableRequest createTableRequest = - CreateTableRequest.builder() - .withName(tableName) - .withLocation( - String.format("%s/%s/%s/%s", catalogLocation, CATALOG, NAMESPACE, tableName)) - .withSchema(SCHEMA) - .build(); - services - .restApi() - .createTable( - CATALOG, - NAMESPACE, - createTableRequest, - null, - services.realmContext(), - services.securityContext()); - } -} From aa43beb21db47935d067dbd1f82c275e91d829f5 Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Mon, 19 Jan 2026 09:49:41 -0800 Subject: [PATCH 03/13] Removed internal document --- .polaris-work-notes.md | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 .polaris-work-notes.md diff --git a/.polaris-work-notes.md b/.polaris-work-notes.md deleted file mode 100644 index cbd906acff..0000000000 --- a/.polaris-work-notes.md +++ /dev/null @@ -1,18 +0,0 @@ -# Polaris Work Notes - -## Active Branches - -### feat-3337-rest-catalog-metrics-table-merged -- **Remote**: `obelix74/polaris` (origin) -- **PR**: #3385 -- **Description**: Add metrics persistence with dual storage strategy for Iceberg table operations -- **Last Updated**: 2026-01-17 -- **Notes**: - - This branch is tracking `origin/feat-3337-rest-catalog-metrics-table-merged` - - Fixed failing tests by adding `ENABLE_METRICS_EVENT_EMISSION` feature flag - - To push: `git push origin feat-3337-rest-catalog-metrics-table-merged` - -## Git Remotes -- **origin**: https://github.com/obelix74/polaris.git (your fork) -- **upstream**: https://github.com/apache/polaris.git (Apache upstream) - From 489a140b57ea05e473ccc72750b330982e780b34 Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Mon, 19 Jan 2026 09:50:48 -0800 Subject: [PATCH 04/13] docs: Update CHANGELOG for metrics processor configuration - Remove ENABLE_METRICS_EVENT_EMISSION feature flag entry - Add polaris.metrics.processor.type configuration property --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63cd7b733c..b1e96ee372 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,7 +55,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti ### New Features -- Added `ENABLE_METRICS_EVENT_EMISSION` feature flag (default: false) to control the emission of `BEFORE_REPORT_METRICS` and `AFTER_REPORT_METRICS` events when the Iceberg REST catalog API's `reportMetrics()` method is called. When enabled, event listeners can receive metrics report data for use cases like audit logging and metrics persistence. Can be configured via `polaris.features."ENABLE_METRICS_EVENT_EMISSION"=true`. +- Added `polaris.metrics.processor.type` configuration property to control metrics processing. Supports CDI-based processor selection via `@Identifier` annotations. Default value is `noop`. Processor implementations will be added in subsequent releases. - Added `--no-sts` flag to CLI to support S3-compatible storage systems that do not have Security Token Service available. - Support credential vending for federated catalogs. `ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING` (default: true) was added to toggle this feature. - Enhanced catalog federation with SigV4 authentication support, additional authentication types for credential vending, and location-based access restrictions to block credential vending for remote tables outside allowed location lists. From ea47412dd5401dd55910f3619c01e6653724293e Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Mon, 19 Jan 2026 10:50:40 -0800 Subject: [PATCH 05/13] SpotlessApply --- .../org/apache/polaris/core/config/FeatureConfiguration.java | 1 - .../iceberg/IcebergRestCatalogEventServiceDelegator.java | 1 - .../org/apache/polaris/service/events/EventAttributes.java | 1 - .../polaris/service/reporting/MetricsProcessingContext.java | 1 - .../apache/polaris/service/reporting/MetricsProcessor.java | 5 ++--- .../service/reporting/MetricsProcessorConfiguration.java | 1 - 6 files changed, 2 insertions(+), 8 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index ef6ef692fb..1eb121c498 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -516,5 +516,4 @@ public static void enforceFeatureEnabledOrThrow( + "Helps prevent thundering herd when multiple requests fail simultaneously.") .defaultValue(0.5) .buildFeatureConfiguration(); - } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 34eacecc2b..7d0c9be303 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -41,7 +41,6 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; -import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.service.catalog.CatalogPrefixParser; diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java b/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java index 7680a90be6..7c62f84c31 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java @@ -28,7 +28,6 @@ import org.apache.iceberg.rest.requests.CreateViewRequest; import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; -import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ConfigResponse; diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java index 629d43362f..71b50eb6d8 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java @@ -64,4 +64,3 @@ public interface MetricsProcessingContext { /** Timestamp when metrics were received (milliseconds since epoch) */ long timestampMs(); } - diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java index 0071df16cc..83ddcced28 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java @@ -26,8 +26,8 @@ * systems, or perform custom processing. * *

Processors are discovered via CDI using the {@link io.smallrye.common.annotation.Identifier} - * annotation. Custom processors can be implemented and registered by annotating them with {@code - * @ApplicationScoped} and {@code @Identifier("custom-name")}. + * annotation. Custom processors can be implemented and registered by annotating them with + * {@code @ApplicationScoped} and {@code @Identifier("custom-name")}. * *

Available built-in processors: * @@ -75,4 +75,3 @@ public interface MetricsProcessor { */ void process(MetricsProcessingContext context); } - diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java index ba55985058..1e3d6668e3 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java @@ -99,4 +99,3 @@ interface Retention { Duration cleanupInterval(); } } - From 772e6fb55458f06d15c1da099cb5b82b00ab7f3b Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Mon, 19 Jan 2026 15:32:20 -0800 Subject: [PATCH 06/13] Review comments --- CHANGELOG.md | 1 - .../iceberg/IcebergCatalogAdapter.java | 3 +- ...ebergRestCatalogEventServiceDelegator.java | 3 +- .../service/config/ServiceProducers.java | 42 -------- .../reporting/DefaultMetricsReporter.java | 3 +- .../reporting/MetricsProcessingContext.java | 66 ------------ .../service/reporting/MetricsProcessor.java | 77 ------------- .../MetricsProcessorConfiguration.java | 101 ------------------ .../reporting/PolarisMetricsReporter.java | 3 +- .../reporting/DefaultMetricsReporterTest.java | 2 +- 10 files changed, 8 insertions(+), 293 deletions(-) delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b1e96ee372..93e51deab4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,7 +55,6 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti ### New Features -- Added `polaris.metrics.processor.type` configuration property to control metrics processing. Supports CDI-based processor selection via `@Identifier` annotations. Default value is `noop`. Processor implementations will be added in subsequent releases. - Added `--no-sts` flag to CLI to support S3-compatible storage systems that do not have Security Token Service available. - Support credential vending for federated catalogs. `ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING` (default: true) was added to toggle this feature. - Enhanced catalog federation with SigV4 authentication support, additional authentication types for credential vending, and location-based access restrictions to block credential vending for remote tables outside allowed location lists. diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 6c30afb9e7..889e148f91 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -722,7 +722,8 @@ public Response reportMetrics( Namespace ns = decodeNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); - metricsReporter.reportMetric(catalogName, tableIdentifier, reportMetricsRequest.report()); + metricsReporter.reportMetric( + catalogName, tableIdentifier, reportMetricsRequest.report(), System.currentTimeMillis()); return Response.status(Response.Status.NO_CONTENT).build(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 7d0c9be303..b1ffa35f0b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -809,8 +809,7 @@ public Response reportMetrics( ReportMetricsRequest reportMetricsRequest, RealmContext realmContext, SecurityContext securityContext) { - // Delegate directly to the underlying service - // Metrics processing will be handled by MetricsProcessor (to be implemented in later PR) + // Delegate directly to the underlying service. return delegate.reportMetrics( prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java index e8207c7f37..840b3fb80a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java @@ -78,8 +78,6 @@ import org.apache.polaris.service.ratelimiter.RateLimiterFilterConfiguration; import org.apache.polaris.service.ratelimiter.TokenBucketConfiguration; import org.apache.polaris.service.ratelimiter.TokenBucketFactory; -import org.apache.polaris.service.reporting.MetricsProcessor; -import org.apache.polaris.service.reporting.MetricsProcessorConfiguration; import org.apache.polaris.service.reporting.MetricsReportingConfiguration; import org.apache.polaris.service.reporting.PolarisMetricsReporter; import org.apache.polaris.service.secrets.SecretsManagerConfiguration; @@ -437,44 +435,4 @@ public PolarisMetricsReporter metricsReporter( MetricsReportingConfiguration config, @Any Instance reporters) { return reporters.select(Identifier.Literal.of(config.type())).get(); } - - /** - * Produces the {@link MetricsProcessor} for metrics processing. - * - *

This producer supports the new configuration path: {@code polaris.metrics.processor.type} - * - *

The processor is selected based on the configured type using CDI {@link Identifier} - * annotations. Built-in processors include: - * - *

    - *
  • {@code noop} - Discards all metrics (default) - *
  • {@code logging} - Logs metrics to console - *
  • {@code persistence} - Persists to dedicated database tables - *
- * - *

Custom processors can be implemented by creating a CDI bean with an {@code @Identifier} - * annotation. - * - *

Note: Implementations will be added in subsequent PRs. This producer provides the CDI - * infrastructure for the metrics processing system. - */ - @Produces - @ApplicationScoped - public MetricsProcessor metricsProcessor( - MetricsProcessorConfiguration config, @Any Instance processors) { - String type = config.type(); - LOGGER.info("Initializing metrics processor: type={}", type); - - try { - MetricsProcessor processor = processors.select(Identifier.Literal.of(type)).get(); - LOGGER.info("Successfully initialized metrics processor: {}", type); - return processor; - } catch (Exception e) { - LOGGER.error( - "Failed to instantiate metrics processor for type '{}': {}. Falling back to noop.", - type, - e.getMessage()); - return processors.select(Identifier.Literal.of("noop")).get(); - } - } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java index 5c7b4934a9..4fde1c84e0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java @@ -46,7 +46,8 @@ public DefaultMetricsReporter() { } @Override - public void reportMetric(String catalogName, TableIdentifier table, MetricsReport metricsReport) { + public void reportMetric( + String catalogName, TableIdentifier table, MetricsReport metricsReport, long timestampMs) { reportConsumer.accept(catalogName, table, metricsReport); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java deleted file mode 100644 index 71b50eb6d8..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessingContext.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.reporting; - -import java.util.Optional; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.metrics.MetricsReport; -import org.immutables.value.Value; - -/** - * Context information for metrics processing, providing access to request metadata, security - * context, and tracing information. - * - *

This immutable context object contains all the information needed to process and persist - * Iceberg metrics reports, including catalog and table identifiers, the metrics report itself, and - * associated metadata like principal name, request ID, and OpenTelemetry trace context. - */ -@Value.Immutable -public interface MetricsProcessingContext { - - /** The catalog name where the metrics originated */ - String catalogName(); - - /** The table identifier */ - TableIdentifier tableIdentifier(); - - /** The Iceberg metrics report (ScanReport or CommitReport) */ - MetricsReport metricsReport(); - - /** The realm ID */ - String realmId(); - - /** The catalog ID (internal entity ID) */ - Optional catalogId(); - - /** The principal name who submitted the metrics */ - Optional principalName(); - - /** The request ID for correlation */ - Optional requestId(); - - /** OpenTelemetry trace ID */ - Optional otelTraceId(); - - /** OpenTelemetry span ID */ - Optional otelSpanId(); - - /** Timestamp when metrics were received (milliseconds since epoch) */ - long timestampMs(); -} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java deleted file mode 100644 index 83ddcced28..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessor.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.reporting; - -/** - * Interface for processing Iceberg metrics reports in Polaris. - * - *

This interface provides a pluggable mechanism for handling metrics reports from Iceberg table - * operations. Implementations can persist metrics to various backends, forward them to external - * systems, or perform custom processing. - * - *

Processors are discovered via CDI using the {@link io.smallrye.common.annotation.Identifier} - * annotation. Custom processors can be implemented and registered by annotating them with - * {@code @ApplicationScoped} and {@code @Identifier("custom-name")}. - * - *

Available built-in processors: - * - *

    - *
  • {@code noop} - Discards all metrics (default) - *
  • {@code logging} - Logs metrics to console for debugging - *
  • {@code persistence} - Persists to dedicated metrics tables - *
- * - *

Example configuration: - * - *

- * polaris:
- *   metrics:
- *     processor:
- *       type: persistence
- * 
- * - *

Custom implementations should be annotated with: - * - *

- * {@literal @}ApplicationScoped
- * {@literal @}Identifier("custom-processor")
- * public class CustomMetricsProcessor implements MetricsProcessor {
- *   {@literal @}Override
- *   public void process(MetricsProcessingContext context) {
- *     // implementation
- *   }
- * }
- * 
- * - * @see MetricsProcessingContext - * @see MetricsProcessorConfiguration - */ -public interface MetricsProcessor { - - /** - * Process a metrics report with full context information. - * - *

Implementations should handle exceptions gracefully and not throw exceptions that would - * disrupt the metrics reporting flow. Errors should be logged and metrics about processing - * failures should be emitted. - * - * @param context the complete context for metrics processing - */ - void process(MetricsProcessingContext context); -} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java deleted file mode 100644 index 1e3d6668e3..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsProcessorConfiguration.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.reporting; - -import io.smallrye.config.ConfigMapping; -import io.smallrye.config.WithDefault; -import java.time.Duration; -import java.util.Optional; - -/** - * Configuration for metrics processing in Polaris. - * - *

This configuration controls how Iceberg metrics reports are processed and persisted. The - * processor type determines which implementation is used. - * - *

Example configuration: - * - *

- * polaris:
- *   metrics:
- *     processor:
- *       type: persistence
- *       retention:
- *         enabled: true
- *         retention-period: P30D
- *         cleanup-interval: PT6H
- * 
- */ -@ConfigMapping(prefix = "polaris.metrics.processor") -public interface MetricsProcessorConfiguration { - - /** - * The type of metrics processor to use. - * - *

Supported built-in values: - * - *

    - *
  • {@code noop} - No processing, discards all metrics (default) - *
  • {@code logging} - Log metrics to console for debugging - *
  • {@code persistence} - Persist to dedicated metrics tables - *
- * - *

Custom processor types can be specified if a corresponding {@link MetricsProcessor} - * implementation is available with a matching {@link io.smallrye.common.annotation.Identifier}. - * - * @return the processor type identifier - */ - @WithDefault("noop") - String type(); - - /** - * Retention policy configuration for persisted metrics reports. - * - * @return the retention configuration - */ - Optional retention(); - - /** Retention policy configuration for metrics reports. */ - interface Retention { - - /** - * Whether automatic cleanup of old metrics reports is enabled. - * - * @return true if cleanup is enabled - */ - @WithDefault("false") - boolean enabled(); - - /** - * How long to retain metrics reports before they are eligible for deletion. - * - * @return the retention period (default: 30 days) - */ - @WithDefault("P30D") - Duration retentionPeriod(); - - /** - * How often to run the cleanup job. - * - * @return the cleanup interval (default: 6 hours) - */ - @WithDefault("PT6H") - Duration cleanupInterval(); - } -} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java index 7ffd84f4d8..b20cbc73af 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java @@ -22,5 +22,6 @@ import org.apache.iceberg.metrics.MetricsReport; public interface PolarisMetricsReporter { - public void reportMetric(String catalogName, TableIdentifier table, MetricsReport metricsReport); + void reportMetric( + String catalogName, TableIdentifier table, MetricsReport metricsReport, long timestampMs); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java index dfdde0f3ef..2d73687caa 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java @@ -37,7 +37,7 @@ void testLogging() { TableIdentifier table = TableIdentifier.of("testNamespace", "testTable"); MetricsReport metricsReport = mock(MetricsReport.class); - reporter.reportMetric(warehouse, table, metricsReport); + reporter.reportMetric(warehouse, table, metricsReport, 0L); verify(mockConsumer).accept(warehouse, table, metricsReport); } From f3bd22e512c0b6daba16799e3374f185358cd306 Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Mon, 19 Jan 2026 15:52:14 -0800 Subject: [PATCH 07/13] Review comments --- CHANGELOG.md | 3 + .../reporting/DefaultMetricsReporter.java | 31 ++++++++-- .../reporting/PolarisMetricsReporter.java | 60 +++++++++++++++++++ .../reporting/DefaultMetricsReporterTest.java | 9 +-- 4 files changed, 93 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93e51deab4..464af46873 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,9 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti - The EclipseLink Persistence implementation has been completely removed. - The default request ID header name has changed from `Polaris-Request-Id` to `X-Request-ID`. - The (Before/After)CommitTableEvent has been removed. +- The `PolarisMetricsReporter.reportMetric()` method signature has been extended to include a + `timestampMs` parameter. A backward-compatible default method is provided for existing + implementations, but custom implementations should migrate to the new signature. ### New Features diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java index 4fde1c84e0..baeab7ff38 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java @@ -21,33 +21,52 @@ import com.google.common.annotations.VisibleForTesting; import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; -import org.apache.commons.lang3.function.TriConsumer; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Default implementation of {@link PolarisMetricsReporter} that logs metrics to the configured + * logger. + * + *

This implementation is selected when {@code polaris.iceberg-metrics.reporting.type} is set to + * {@code "default"} (the default value). + * + *

By default, logging is disabled. To enable metrics logging, set the logger level for {@code + * org.apache.polaris.service.reporting} to {@code INFO} in your logging configuration. + * + * @see PolarisMetricsReporter + */ @ApplicationScoped @Identifier("default") public class DefaultMetricsReporter implements PolarisMetricsReporter { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMetricsReporter.class); - private final TriConsumer reportConsumer; + private final QuadConsumer reportConsumer; + + /** Functional interface for consuming metrics reports with timestamp. */ + @FunctionalInterface + interface QuadConsumer { + void accept(T1 t1, T2 t2, T3 t3, T4 t4); + } + /** Creates a new DefaultMetricsReporter that logs metrics to the class logger. */ public DefaultMetricsReporter() { this( - (catalogName, table, metricsReport) -> - LOGGER.info("{}.{}: {}", catalogName, table, metricsReport)); + (catalogName, table, metricsReport, timestampMs) -> + LOGGER.info("{}.{} (ts={}): {}", catalogName, table, timestampMs, metricsReport)); } @VisibleForTesting - DefaultMetricsReporter(TriConsumer reportConsumer) { + DefaultMetricsReporter( + QuadConsumer reportConsumer) { this.reportConsumer = reportConsumer; } @Override public void reportMetric( String catalogName, TableIdentifier table, MetricsReport metricsReport, long timestampMs) { - reportConsumer.accept(catalogName, table, metricsReport); + reportConsumer.accept(catalogName, table, metricsReport, timestampMs); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java index b20cbc73af..6dda5c8636 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java @@ -21,7 +21,67 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; +/** + * SPI interface for reporting Iceberg metrics received by Polaris. + * + *

Implementations can be used to send metrics to external systems for analysis and monitoring. + * Custom implementations must be annotated with {@link + * jakarta.enterprise.context.ApplicationScoped @ApplicationScoped} and {@link + * io.smallrye.common.annotation.Identifier @Identifier("my-reporter-type")} for CDI discovery. + * + *

The implementation to use is selected via the {@code polaris.iceberg-metrics.reporting.type} + * configuration property, which defaults to {@code "default"}. + * + *

Custom implementations that need access to request-scoped context (such as realm information + * or principal details) should inject the appropriate CDI beans (e.g., {@code RealmContext}, {@code + * PolarisPrincipalHolder}) rather than expecting this data to be passed as parameters. + * + *

Example implementation: + * + *

{@code
+ * @ApplicationScoped
+ * @Identifier("custom")
+ * public class CustomMetricsReporter implements PolarisMetricsReporter {
+ *
+ *   @Inject RealmContext realmContext;
+ *
+ *   @Override
+ *   public void reportMetric(
+ *       String catalogName, TableIdentifier table, MetricsReport metricsReport, long timestampMs) {
+ *     // Send metrics to external system
+ *   }
+ * }
+ * }
+ * + * @see DefaultMetricsReporter + * @see MetricsReportingConfiguration + */ public interface PolarisMetricsReporter { + + /** + * Reports an Iceberg metrics report for a specific table. + * + * @param catalogName the name of the catalog containing the table + * @param table the identifier of the table the metrics are for + * @param metricsReport the Iceberg metrics report (e.g., {@link + * org.apache.iceberg.metrics.ScanReport} or {@link org.apache.iceberg.metrics.CommitReport}) + * @param timestampMs the timestamp in milliseconds when the metrics were received + */ void reportMetric( String catalogName, TableIdentifier table, MetricsReport metricsReport, long timestampMs); + + /** + * Reports an Iceberg metrics report for a specific table. + * + * @param catalogName the name of the catalog containing the table + * @param table the identifier of the table the metrics are for + * @param metricsReport the Iceberg metrics report + * @deprecated Use {@link #reportMetric(String, TableIdentifier, MetricsReport, long)} instead. + * This method is provided for backward compatibility and will be removed in a future release. + */ + @Deprecated + default void reportMetric( + String catalogName, TableIdentifier table, MetricsReport metricsReport) { + reportMetric(catalogName, table, metricsReport, System.currentTimeMillis()); + } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java index 2d73687caa..9d28fc4d73 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java @@ -21,7 +21,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import org.apache.commons.lang3.function.TriConsumer; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; import org.junit.jupiter.api.Test; @@ -31,14 +30,16 @@ public class DefaultMetricsReporterTest { @Test void testLogging() { @SuppressWarnings("unchecked") - TriConsumer mockConsumer = mock(TriConsumer.class); + DefaultMetricsReporter.QuadConsumer mockConsumer = + mock(DefaultMetricsReporter.QuadConsumer.class); DefaultMetricsReporter reporter = new DefaultMetricsReporter(mockConsumer); String warehouse = "testWarehouse"; TableIdentifier table = TableIdentifier.of("testNamespace", "testTable"); MetricsReport metricsReport = mock(MetricsReport.class); + long timestampMs = 1234567890L; - reporter.reportMetric(warehouse, table, metricsReport, 0L); + reporter.reportMetric(warehouse, table, metricsReport, timestampMs); - verify(mockConsumer).accept(warehouse, table, metricsReport); + verify(mockConsumer).accept(warehouse, table, metricsReport, timestampMs); } } From 194150c53b23dd4dfc815086ff99a2a8c9b21d5f Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Mon, 19 Jan 2026 16:36:31 -0800 Subject: [PATCH 08/13] Added a service producer for SpanContext --- .../service/config/ServiceProducers.java | 42 +++++++++++++++++++ .../reporting/PolarisMetricsReporter.java | 17 +++++--- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java index 840b3fb80a..3028670253 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java @@ -19,6 +19,8 @@ package org.apache.polaris.service.config; import io.micrometer.core.instrument.MeterRegistry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; import io.smallrye.common.annotation.Identifier; import io.smallrye.context.SmallRyeManagedExecutor; import jakarta.enterprise.context.ApplicationScoped; @@ -429,6 +431,46 @@ public void closeTaskExecutor(@Disposes @Identifier("task-executor") ManagedExec executor.close(); } + /** + * Produces the current OpenTelemetry {@link SpanContext} for the request. + * + *

This allows custom {@link PolarisMetricsReporter} implementations to access the current + * trace and span IDs for correlation with external monitoring systems. + * + *

Example usage in a custom metrics reporter: + * + *

{@code
+   * @ApplicationScoped
+   * @Identifier("custom")
+   * public class CustomMetricsReporter implements PolarisMetricsReporter {
+   *
+   *   @Inject SpanContext spanContext;
+   *
+   *   @Override
+   *   public void reportMetric(String catalogName, TableIdentifier table,
+   *       MetricsReport metricsReport, long timestampMs) {
+   *     String traceId = spanContext.isValid() ? spanContext.getTraceId() : null;
+   *     String spanId = spanContext.isValid() ? spanContext.getSpanId() : null;
+   *     // Forward metrics with trace correlation...
+   *   }
+   * }
+   * }
+ * + * @return the current span context, or {@link SpanContext#getInvalid()} if no span is active + */ + @Produces + @RequestScoped + public SpanContext currentSpanContext() { + Span currentSpan = Span.current(); + if (currentSpan != null) { + SpanContext spanContext = currentSpan.getSpanContext(); + if (spanContext != null) { + return spanContext; + } + } + return SpanContext.getInvalid(); + } + @Produces @ApplicationScoped public PolarisMetricsReporter metricsReporter( diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java index 6dda5c8636..39e7c2db1d 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java @@ -32,11 +32,16 @@ *

The implementation to use is selected via the {@code polaris.iceberg-metrics.reporting.type} * configuration property, which defaults to {@code "default"}. * - *

Custom implementations that need access to request-scoped context (such as realm information - * or principal details) should inject the appropriate CDI beans (e.g., {@code RealmContext}, {@code - * PolarisPrincipalHolder}) rather than expecting this data to be passed as parameters. + *

Custom implementations that need access to request-scoped context should inject the + * appropriate CDI beans rather than expecting this data to be passed as parameters: * - *

Example implementation: + *

    + *
  • {@code RealmContext} - for realm/tenant information + *
  • {@code io.opentelemetry.api.trace.SpanContext} - for OpenTelemetry trace/span IDs + *
  • {@code PolarisPrincipalHolder} - for authenticated principal information + *
+ * + *

Example implementation with OpenTelemetry correlation: * *

{@code
  * @ApplicationScoped
@@ -44,11 +49,13 @@
  * public class CustomMetricsReporter implements PolarisMetricsReporter {
  *
  *   @Inject RealmContext realmContext;
+ *   @Inject SpanContext spanContext;
  *
  *   @Override
  *   public void reportMetric(
  *       String catalogName, TableIdentifier table, MetricsReport metricsReport, long timestampMs) {
- *     // Send metrics to external system
+ *     String traceId = spanContext.isValid() ? spanContext.getTraceId() : null;
+ *     // Send metrics to external system with trace correlation
  *   }
  * }
  * }
From e4e9f0b5aa80aceb71ceb14a52f3a69135cf4521 Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Tue, 20 Jan 2026 07:56:37 -0800 Subject: [PATCH 09/13] Review comments --- CHANGELOG.md | 5 ++- .../iceberg/IcebergCatalogAdapter.java | 3 +- ...ebergRestCatalogEventServiceDelegator.java | 6 +-- .../service/config/ServiceProducers.java | 42 ------------------- .../reporting/DefaultMetricsReporter.java | 16 ++++--- .../reporting/PolarisMetricsReporter.java | 21 ++++++---- .../reporting/DefaultMetricsReporterTest.java | 11 ++--- .../apache/polaris/service/TestServices.java | 3 +- 8 files changed, 37 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 464af46873..a319457b5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,8 +53,9 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti - The default request ID header name has changed from `Polaris-Request-Id` to `X-Request-ID`. - The (Before/After)CommitTableEvent has been removed. - The `PolarisMetricsReporter.reportMetric()` method signature has been extended to include a - `timestampMs` parameter. A backward-compatible default method is provided for existing - implementations, but custom implementations should migrate to the new signature. + `receivedTimestamp` parameter of type `java.time.Instant`. A backward-compatible default method + is provided for existing implementations, but custom implementations should migrate to the new + signature. ### New Features diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 889e148f91..9d2a95185a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -30,6 +30,7 @@ import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.SecurityContext; +import java.time.Instant; import java.util.EnumSet; import java.util.Optional; import java.util.function.Function; @@ -723,7 +724,7 @@ public Response reportMetrics( TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); metricsReporter.reportMetric( - catalogName, tableIdentifier, reportMetricsRequest.report(), System.currentTimeMillis()); + catalogName, tableIdentifier, reportMetricsRequest.report(), Instant.now()); return Response.status(Response.Status.NO_CONTENT).build(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index b1ffa35f0b..09aff7b500 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -41,7 +41,6 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; -import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.service.catalog.CatalogPrefixParser; import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService; @@ -65,7 +64,6 @@ public class IcebergRestCatalogEventServiceDelegator @Inject PolarisEventListener polarisEventListener; @Inject PolarisEventMetadataFactory eventMetadataFactory; @Inject CatalogPrefixParser prefixParser; - @Inject RealmConfig realmConfig; // Constructor for testing - allows manual dependency injection @VisibleForTesting @@ -73,13 +71,11 @@ public IcebergRestCatalogEventServiceDelegator( IcebergCatalogAdapter delegate, PolarisEventListener polarisEventListener, PolarisEventMetadataFactory eventMetadataFactory, - CatalogPrefixParser prefixParser, - RealmConfig realmConfig) { + CatalogPrefixParser prefixParser) { this.delegate = delegate; this.polarisEventListener = polarisEventListener; this.eventMetadataFactory = eventMetadataFactory; this.prefixParser = prefixParser; - this.realmConfig = realmConfig; } // Default constructor for CDI diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java index 3028670253..840b3fb80a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java @@ -19,8 +19,6 @@ package org.apache.polaris.service.config; import io.micrometer.core.instrument.MeterRegistry; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.smallrye.common.annotation.Identifier; import io.smallrye.context.SmallRyeManagedExecutor; import jakarta.enterprise.context.ApplicationScoped; @@ -431,46 +429,6 @@ public void closeTaskExecutor(@Disposes @Identifier("task-executor") ManagedExec executor.close(); } - /** - * Produces the current OpenTelemetry {@link SpanContext} for the request. - * - *

This allows custom {@link PolarisMetricsReporter} implementations to access the current - * trace and span IDs for correlation with external monitoring systems. - * - *

Example usage in a custom metrics reporter: - * - *

{@code
-   * @ApplicationScoped
-   * @Identifier("custom")
-   * public class CustomMetricsReporter implements PolarisMetricsReporter {
-   *
-   *   @Inject SpanContext spanContext;
-   *
-   *   @Override
-   *   public void reportMetric(String catalogName, TableIdentifier table,
-   *       MetricsReport metricsReport, long timestampMs) {
-   *     String traceId = spanContext.isValid() ? spanContext.getTraceId() : null;
-   *     String spanId = spanContext.isValid() ? spanContext.getSpanId() : null;
-   *     // Forward metrics with trace correlation...
-   *   }
-   * }
-   * }
- * - * @return the current span context, or {@link SpanContext#getInvalid()} if no span is active - */ - @Produces - @RequestScoped - public SpanContext currentSpanContext() { - Span currentSpan = Span.current(); - if (currentSpan != null) { - SpanContext spanContext = currentSpan.getSpanContext(); - if (spanContext != null) { - return spanContext; - } - } - return SpanContext.getInvalid(); - } - @Produces @ApplicationScoped public PolarisMetricsReporter metricsReporter( diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java index baeab7ff38..eaff0219b4 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; +import java.time.Instant; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; import org.slf4j.Logger; @@ -43,7 +44,7 @@ public class DefaultMetricsReporter implements PolarisMetricsReporter { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMetricsReporter.class); - private final QuadConsumer reportConsumer; + private final QuadConsumer reportConsumer; /** Functional interface for consuming metrics reports with timestamp. */ @FunctionalInterface @@ -54,19 +55,22 @@ interface QuadConsumer { /** Creates a new DefaultMetricsReporter that logs metrics to the class logger. */ public DefaultMetricsReporter() { this( - (catalogName, table, metricsReport, timestampMs) -> - LOGGER.info("{}.{} (ts={}): {}", catalogName, table, timestampMs, metricsReport)); + (catalogName, table, metricsReport, receivedTimestamp) -> + LOGGER.info("{}.{} (ts={}): {}", catalogName, table, receivedTimestamp, metricsReport)); } @VisibleForTesting DefaultMetricsReporter( - QuadConsumer reportConsumer) { + QuadConsumer reportConsumer) { this.reportConsumer = reportConsumer; } @Override public void reportMetric( - String catalogName, TableIdentifier table, MetricsReport metricsReport, long timestampMs) { - reportConsumer.accept(catalogName, table, metricsReport, timestampMs); + String catalogName, + TableIdentifier table, + MetricsReport metricsReport, + Instant receivedTimestamp) { + reportConsumer.accept(catalogName, table, metricsReport, receivedTimestamp); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java index 39e7c2db1d..90b55d31d5 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.reporting; +import java.time.Instant; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; @@ -37,7 +38,8 @@ * *
    *
  • {@code RealmContext} - for realm/tenant information - *
  • {@code io.opentelemetry.api.trace.SpanContext} - for OpenTelemetry trace/span IDs + *
  • {@code io.opentelemetry.api.trace.Span} - for OpenTelemetry trace/span IDs (inject via + * {@code @Inject Span span} or use {@code Span.current()}) *
  • {@code PolarisPrincipalHolder} - for authenticated principal information *
* @@ -49,11 +51,13 @@ * public class CustomMetricsReporter implements PolarisMetricsReporter { * * @Inject RealmContext realmContext; - * @Inject SpanContext spanContext; + * @Inject Span span; // or use Span.current() * * @Override * public void reportMetric( - * String catalogName, TableIdentifier table, MetricsReport metricsReport, long timestampMs) { + * String catalogName, TableIdentifier table, MetricsReport metricsReport, + * Instant receivedTimestamp) { + * SpanContext spanContext = span.getSpanContext(); * String traceId = spanContext.isValid() ? spanContext.getTraceId() : null; * // Send metrics to external system with trace correlation * } @@ -72,10 +76,13 @@ public interface PolarisMetricsReporter { * @param table the identifier of the table the metrics are for * @param metricsReport the Iceberg metrics report (e.g., {@link * org.apache.iceberg.metrics.ScanReport} or {@link org.apache.iceberg.metrics.CommitReport}) - * @param timestampMs the timestamp in milliseconds when the metrics were received + * @param receivedTimestamp the timestamp when the metrics were received by Polaris */ void reportMetric( - String catalogName, TableIdentifier table, MetricsReport metricsReport, long timestampMs); + String catalogName, + TableIdentifier table, + MetricsReport metricsReport, + Instant receivedTimestamp); /** * Reports an Iceberg metrics report for a specific table. @@ -83,12 +90,12 @@ void reportMetric( * @param catalogName the name of the catalog containing the table * @param table the identifier of the table the metrics are for * @param metricsReport the Iceberg metrics report - * @deprecated Use {@link #reportMetric(String, TableIdentifier, MetricsReport, long)} instead. + * @deprecated Use {@link #reportMetric(String, TableIdentifier, MetricsReport, Instant)} instead. * This method is provided for backward compatibility and will be removed in a future release. */ @Deprecated default void reportMetric( String catalogName, TableIdentifier table, MetricsReport metricsReport) { - reportMetric(catalogName, table, metricsReport, System.currentTimeMillis()); + reportMetric(catalogName, table, metricsReport, Instant.now()); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java index 9d28fc4d73..8762c3ed74 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import java.time.Instant; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReport; import org.junit.jupiter.api.Test; @@ -30,16 +31,16 @@ public class DefaultMetricsReporterTest { @Test void testLogging() { @SuppressWarnings("unchecked") - DefaultMetricsReporter.QuadConsumer mockConsumer = - mock(DefaultMetricsReporter.QuadConsumer.class); + DefaultMetricsReporter.QuadConsumer + mockConsumer = mock(DefaultMetricsReporter.QuadConsumer.class); DefaultMetricsReporter reporter = new DefaultMetricsReporter(mockConsumer); String warehouse = "testWarehouse"; TableIdentifier table = TableIdentifier.of("testNamespace", "testTable"); MetricsReport metricsReport = mock(MetricsReport.class); - long timestampMs = 1234567890L; + Instant receivedTimestamp = Instant.ofEpochMilli(1234567890L); - reporter.reportMetric(warehouse, table, metricsReport, timestampMs); + reporter.reportMetric(warehouse, table, metricsReport, receivedTimestamp); - verify(mockConsumer).accept(warehouse, table, metricsReport, timestampMs); + verify(mockConsumer).accept(warehouse, table, metricsReport, receivedTimestamp); } } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 3455c55c51..59af4b5a6c 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -361,8 +361,7 @@ public String getAuthenticationScheme() { catalogService, polarisEventListener, eventMetadataFactory, - new DefaultCatalogPrefixParser(), - realmConfig); + new DefaultCatalogPrefixParser()); finalRestConfigurationService = new IcebergRestConfigurationEventServiceDelegator( catalogService, polarisEventListener, eventMetadataFactory); From d69c08a210915fb55e2e5b69a3317eaa96df70e7 Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Tue, 20 Jan 2026 10:05:33 -0800 Subject: [PATCH 10/13] Review comments about using Clock --- .../service/catalog/iceberg/IcebergCatalogAdapter.java | 9 ++++++--- .../java/org/apache/polaris/service/TestServices.java | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 9d2a95185a..fb54b5572d 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -30,7 +30,7 @@ import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.SecurityContext; -import java.time.Instant; +import java.time.Clock; import java.util.EnumSet; import java.util.Optional; import java.util.function.Function; @@ -106,6 +106,7 @@ public class IcebergCatalogAdapter private final Instance externalCatalogFactories; private final StorageAccessConfigProvider storageAccessConfigProvider; private final PolarisMetricsReporter metricsReporter; + private final Clock clock; @Inject public IcebergCatalogAdapter( @@ -123,7 +124,8 @@ public IcebergCatalogAdapter( CatalogHandlerUtils catalogHandlerUtils, @Any Instance externalCatalogFactories, StorageAccessConfigProvider storageAccessConfigProvider, - PolarisMetricsReporter metricsReporter) { + PolarisMetricsReporter metricsReporter, + Clock clock) { this.diagnostics = diagnostics; this.realmContext = realmContext; this.callContext = callContext; @@ -140,6 +142,7 @@ public IcebergCatalogAdapter( this.externalCatalogFactories = externalCatalogFactories; this.storageAccessConfigProvider = storageAccessConfigProvider; this.metricsReporter = metricsReporter; + this.clock = clock; } /** @@ -724,7 +727,7 @@ public Response reportMetrics( TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table)); metricsReporter.reportMetric( - catalogName, tableIdentifier, reportMetricsRequest.report(), Instant.now()); + catalogName, tableIdentifier, reportMetricsRequest.report(), clock.instant()); return Response.status(Response.Status.NO_CONTENT).build(); } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 59af4b5a6c..30303121e1 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -350,7 +350,8 @@ public String getAuthenticationScheme() { catalogHandlerUtils, externalCatalogFactory, storageAccessConfigProvider, - new DefaultMetricsReporter()); + new DefaultMetricsReporter(), + Clock.systemUTC()); // Optionally wrap with event delegator IcebergRestCatalogApiService finalRestCatalogService = catalogService; From f2e23795059f0cd7b8946cab0fcd127e90b9d66a Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Tue, 20 Jan 2026 11:18:17 -0800 Subject: [PATCH 11/13] Review comments. --- CHANGELOG.md | 4 +- ...ebergRestCatalogEventServiceDelegator.java | 1 - .../reporting/PolarisMetricsReporter.java | 48 +------------------ 3 files changed, 3 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a319457b5e..5626e93840 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,9 +53,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti - The default request ID header name has changed from `Polaris-Request-Id` to `X-Request-ID`. - The (Before/After)CommitTableEvent has been removed. - The `PolarisMetricsReporter.reportMetric()` method signature has been extended to include a - `receivedTimestamp` parameter of type `java.time.Instant`. A backward-compatible default method - is provided for existing implementations, but custom implementations should migrate to the new - signature. + `receivedTimestamp` parameter of type `java.time.Instant`. ### New Features diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 09aff7b500..3a8a35e9e2 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -805,7 +805,6 @@ public Response reportMetrics( ReportMetricsRequest reportMetricsRequest, RealmContext realmContext, SecurityContext securityContext) { - // Delegate directly to the underlying service. return delegate.reportMetrics( prefix, namespace, table, reportMetricsRequest, realmContext, securityContext); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java index 90b55d31d5..bda4a5c922 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java @@ -27,42 +27,13 @@ * *

Implementations can be used to send metrics to external systems for analysis and monitoring. * Custom implementations must be annotated with {@link - * jakarta.enterprise.context.ApplicationScoped @ApplicationScoped} and {@link + * jakarta.enterprise.context.RequestScoped @RequestScoped} and {@link * io.smallrye.common.annotation.Identifier @Identifier("my-reporter-type")} for CDI discovery. * *

The implementation to use is selected via the {@code polaris.iceberg-metrics.reporting.type} * configuration property, which defaults to {@code "default"}. * - *

Custom implementations that need access to request-scoped context should inject the - * appropriate CDI beans rather than expecting this data to be passed as parameters: - * - *

    - *
  • {@code RealmContext} - for realm/tenant information - *
  • {@code io.opentelemetry.api.trace.Span} - for OpenTelemetry trace/span IDs (inject via - * {@code @Inject Span span} or use {@code Span.current()}) - *
  • {@code PolarisPrincipalHolder} - for authenticated principal information - *
- * - *

Example implementation with OpenTelemetry correlation: - * - *

{@code
- * @ApplicationScoped
- * @Identifier("custom")
- * public class CustomMetricsReporter implements PolarisMetricsReporter {
- *
- *   @Inject RealmContext realmContext;
- *   @Inject Span span; // or use Span.current()
- *
- *   @Override
- *   public void reportMetric(
- *       String catalogName, TableIdentifier table, MetricsReport metricsReport,
- *       Instant receivedTimestamp) {
- *     SpanContext spanContext = span.getSpanContext();
- *     String traceId = spanContext.isValid() ? spanContext.getTraceId() : null;
- *     // Send metrics to external system with trace correlation
- *   }
- * }
- * }
+ *

Implementations can inject other CDI beans for context. * * @see DefaultMetricsReporter * @see MetricsReportingConfiguration @@ -83,19 +54,4 @@ void reportMetric( TableIdentifier table, MetricsReport metricsReport, Instant receivedTimestamp); - - /** - * Reports an Iceberg metrics report for a specific table. - * - * @param catalogName the name of the catalog containing the table - * @param table the identifier of the table the metrics are for - * @param metricsReport the Iceberg metrics report - * @deprecated Use {@link #reportMetric(String, TableIdentifier, MetricsReport, Instant)} instead. - * This method is provided for backward compatibility and will be removed in a future release. - */ - @Deprecated - default void reportMetric( - String catalogName, TableIdentifier table, MetricsReport metricsReport) { - reportMetric(catalogName, table, metricsReport, Instant.now()); - } } From 64b52e0a01fd8ac0cb0c24853b1d5b853fb7b4e4 Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Tue, 20 Jan 2026 12:41:58 -0800 Subject: [PATCH 12/13] Review comments. --- .../service/reporting/PolarisMetricsReporter.java | 3 +-- .../events/listeners/TestPolarisEventListener.java | 12 ------------ 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java index bda4a5c922..b27184d559 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java @@ -26,8 +26,7 @@ * SPI interface for reporting Iceberg metrics received by Polaris. * *

Implementations can be used to send metrics to external systems for analysis and monitoring. - * Custom implementations must be annotated with {@link - * jakarta.enterprise.context.RequestScoped @RequestScoped} and {@link + * Custom implementations can be annotated with appropriate {@code Quarkus} scope and {@link * io.smallrye.common.annotation.Identifier @Identifier("my-reporter-type")} for CDI discovery. * *

The implementation to use is selected via the {@code polaris.iceberg-metrics.reporting.type} diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java index a0973750b5..11d8a4dae8 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java @@ -44,16 +44,4 @@ public PolarisEvent getLatest(PolarisEventType type) { return latest; } - /** - * Returns the latest event of the specified type, or null if no such event has been recorded. - * This is useful for tests that need to verify no event was emitted. - */ - public PolarisEvent getLatestOrNull(PolarisEventType type) { - return latestEvents.get(type); - } - - /** Returns true if an event of the specified type has been recorded. */ - public boolean hasEvent(PolarisEventType type) { - return latestEvents.containsKey(type); - } } From cbeb62014ff6ee8b256ce99a6f79c139642dd660 Mon Sep 17 00:00:00 2001 From: Anand Kumar Sankaran Date: Tue, 20 Jan 2026 13:10:35 -0800 Subject: [PATCH 13/13] spotlessApply: Remove trailing blank line --- .../service/events/listeners/TestPolarisEventListener.java | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java index 11d8a4dae8..e23a7a9264 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java @@ -43,5 +43,4 @@ public PolarisEvent getLatest(PolarisEventType type) { } return latest; } - }