Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
### Breaking changes

- The (Before/After)CommitTableEvent has been removed.
- The `PolarisMetricsReporter.reportMetric()` method signature has been extended to include a `receivedTimestamp` parameter of type `java.time.Instant`.

### New Features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import java.time.Clock;
import java.util.EnumSet;
import java.util.Optional;
import java.util.function.Function;
Expand Down Expand Up @@ -105,6 +106,7 @@ public class IcebergCatalogAdapter
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
private final StorageAccessConfigProvider storageAccessConfigProvider;
private final PolarisMetricsReporter metricsReporter;
private final Clock clock;

@Inject
public IcebergCatalogAdapter(
Expand All @@ -122,7 +124,8 @@ public IcebergCatalogAdapter(
CatalogHandlerUtils catalogHandlerUtils,
@Any Instance<ExternalCatalogFactory> externalCatalogFactories,
StorageAccessConfigProvider storageAccessConfigProvider,
PolarisMetricsReporter metricsReporter) {
PolarisMetricsReporter metricsReporter,
Clock clock) {
this.diagnostics = diagnostics;
this.realmContext = realmContext;
this.callContext = callContext;
Expand All @@ -139,6 +142,7 @@ public IcebergCatalogAdapter(
this.externalCatalogFactories = externalCatalogFactories;
this.storageAccessConfigProvider = storageAccessConfigProvider;
this.metricsReporter = metricsReporter;
this.clock = clock;
}

/**
Expand Down Expand Up @@ -722,7 +726,8 @@ public Response reportMetrics(
Namespace ns = decodeNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table));

metricsReporter.reportMetric(catalogName, tableIdentifier, reportMetricsRequest.report());
metricsReporter.reportMetric(
catalogName, tableIdentifier, reportMetricsRequest.report(), clock.instant());
return Response.status(Response.Status.NO_CONTENT).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,56 @@
import com.google.common.annotations.VisibleForTesting;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.commons.lang3.function.TriConsumer;
import java.time.Instant;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation of {@link PolarisMetricsReporter} that logs metrics to the configured
* logger.
*
* <p>This implementation is selected when {@code polaris.iceberg-metrics.reporting.type} is set to
* {@code "default"} (the default value).
*
* <p>By default, logging is disabled. To enable metrics logging, set the logger level for {@code
* org.apache.polaris.service.reporting} to {@code INFO} in your logging configuration.
*
* @see PolarisMetricsReporter
*/
@ApplicationScoped
@Identifier("default")
public class DefaultMetricsReporter implements PolarisMetricsReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMetricsReporter.class);

private final TriConsumer<String, TableIdentifier, MetricsReport> reportConsumer;
private final QuadConsumer<String, TableIdentifier, MetricsReport, Instant> reportConsumer;

/** Functional interface for consuming metrics reports with timestamp. */
@FunctionalInterface
interface QuadConsumer<T1, T2, T3, T4> {
void accept(T1 t1, T2 t2, T3 t3, T4 t4);
}

/** Creates a new DefaultMetricsReporter that logs metrics to the class logger. */
public DefaultMetricsReporter() {
this(
(catalogName, table, metricsReport) ->
LOGGER.info("{}.{}: {}", catalogName, table, metricsReport));
(catalogName, table, metricsReport, receivedTimestamp) ->
LOGGER.info("{}.{} (ts={}): {}", catalogName, table, receivedTimestamp, metricsReport));
}

@VisibleForTesting
DefaultMetricsReporter(TriConsumer<String, TableIdentifier, MetricsReport> reportConsumer) {
DefaultMetricsReporter(
QuadConsumer<String, TableIdentifier, MetricsReport, Instant> reportConsumer) {
this.reportConsumer = reportConsumer;
}

@Override
public void reportMetric(String catalogName, TableIdentifier table, MetricsReport metricsReport) {
reportConsumer.accept(catalogName, table, metricsReport);
public void reportMetric(
String catalogName,
TableIdentifier table,
MetricsReport metricsReport,
Instant receivedTimestamp) {
reportConsumer.accept(catalogName, table, metricsReport, receivedTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,39 @@
*/
package org.apache.polaris.service.reporting;

import java.time.Instant;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReport;

/**
* SPI interface for reporting Iceberg metrics received by Polaris.
*
* <p>Implementations can be used to send metrics to external systems for analysis and monitoring.
* Custom implementations can be annotated with appropriate {@code Quarkus} scope and {@link
* io.smallrye.common.annotation.Identifier @Identifier("my-reporter-type")} for CDI discovery.
*
* <p>The implementation to use is selected via the {@code polaris.iceberg-metrics.reporting.type}
* configuration property, which defaults to {@code "default"}.
*
* <p>Implementations can inject other CDI beans for context.
*
* @see DefaultMetricsReporter
* @see MetricsReportingConfiguration
*/
public interface PolarisMetricsReporter {
public void reportMetric(String catalogName, TableIdentifier table, MetricsReport metricsReport);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we release this public interface in 1.3 right ? i wonder if we keep this api with default impl to the new method with null or something ?
361b7e9

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 I had a deprecated backward compatible method and the consensus was to remove it since this SPI is not meant to be used by developers outside Polaris.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't fully get it, wdym by developers outside Polaris ? can this not be used downstream projects, my recommendation is to make sure version upgrades are seemless so keep this and add more if we want and just update the default of this work under timestamp null assumption.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me also check previous discussions, can you please point me to that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@dimas-b dimas-b Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per our standing evolution guidelines public classes / methods can change in any release. Unlike REST API changes, it is not considered a "major" change for versioning purposes. Essentially java methods are not part of the API surface in the SemVer sense.

We should and do try to make java API changes in a backward -compatible manner when practical.

Regarding this particular PR and java interfaces that are part of an SPI (defined and called by Polaris, implemented by 3rd party plugins), I do not see a practical way to evolve them in a backward-compatible manner without causing excessive maintenance burden in Polaris code.

In this case, Polaris would have to define a new interface for the new method signature and perform instanceof checks at runtime in order to decide whether to call the old or the new method. I do believe it would be an overkill at the current stage of the project (still evolving actively). It is not too hard for downstream implementations to adjust code for the newly added parameter.

That said, I'd like to propose moving the SPI evolution discussion to the dev ML as it is a big and complex topic.

@singhpk234 : please clarify whether you consider this comment thread a blocker for merging (or not).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimas-b can we not add the api in the same interface and add the default impl of old api call this null clock, i would like to understand this more if this is possible. If its not possible i am supportive of the change !

@singhpk234 : please clarify whether you consider this comment thread a blocker for merging (or not).

thanks for asking the clarification i believe anything is thats not marked as 'nit' / 'not blocker' / 'not' is expected to be resolved before merging (per here) , i really appreciate you checking in !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not add the api in the same interface and add the default impl of old api call this null clock [...]

I do not see a point in adding a default impl for the old method alone. Existing implementations will have overrides for it already (javac will not let them miss that).

We could add a default impl. for the new method and redirect to the old one (without the Instant). This will allow existing implementations to compile without changes. However, this creates uncertainty for the implementor regarding which method should do the real work. Javadoc helps, but adds cognitive load. We could add default to both methods and deprecate the old one, but this will add "cruft" to the interface definition, when, I believe, adapting to the new interface in downstream projects is very easy.

Please note that providing a custom implementation for this interface requires a downstream build. So all upgrades will go through local builds and CI (assuming normal software engineering practices), where the need for adjustments will be become apparent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I uptook 1.3.0, I was glad that my compilation failed since if the old method was deprecated, I would be confused why I need to uptake the new approach and how to do it.

Having said that, this PR started as adding events for metrics, we removed it. It only contains this extra attribute. It is not a blocker for me. If we don't agree on how to proceed, I would rather abandon this PR and focus on the simplified metrics persistence code here: #3385. That allows Polaris to persist table metrics to the database.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats fair, though in this case we are forcing the upgrades to have this new interface implementation when having the old api was not hurting ? taking the case LOGGERs can be configured to implicitly log timestamp additionally and the way we have reporting wired if i wanna do custom one can just do timestamp internally rather than requesting from the signature ?

   metricsReporter.reportMetric(
        catalogName, tableIdentifier, reportMetricsRequest.report(), clock.instant());

it not the time when the request hit the server but its the time when the reportMetric is called so does it matter of i do this vs

reportMetric(catalogName, tableIdentifier, reportMetricsRequest.report())) {
timestamp = clock.instant()
}

with that being said i think its fine if we wanna move forward :), but above is my thought process. Please move forward @dimas-b trust your judgement here !


/**
* Reports an Iceberg metrics report for a specific table.
*
* @param catalogName the name of the catalog containing the table
* @param table the identifier of the table the metrics are for
* @param metricsReport the Iceberg metrics report (e.g., {@link
* org.apache.iceberg.metrics.ScanReport} or {@link org.apache.iceberg.metrics.CommitReport})
* @param receivedTimestamp the timestamp when the metrics were received by Polaris
*/
void reportMetric(
String catalogName,
TableIdentifier table,
MetricsReport metricsReport,
Instant receivedTimestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import org.apache.commons.lang3.function.TriConsumer;
import java.time.Instant;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReport;
import org.junit.jupiter.api.Test;
Expand All @@ -31,14 +31,16 @@ public class DefaultMetricsReporterTest {
@Test
void testLogging() {
@SuppressWarnings("unchecked")
TriConsumer<String, TableIdentifier, MetricsReport> mockConsumer = mock(TriConsumer.class);
DefaultMetricsReporter.QuadConsumer<String, TableIdentifier, MetricsReport, Instant>
mockConsumer = mock(DefaultMetricsReporter.QuadConsumer.class);
DefaultMetricsReporter reporter = new DefaultMetricsReporter(mockConsumer);
String warehouse = "testWarehouse";
TableIdentifier table = TableIdentifier.of("testNamespace", "testTable");
MetricsReport metricsReport = mock(MetricsReport.class);
Instant receivedTimestamp = Instant.ofEpochMilli(1234567890L);

reporter.reportMetric(warehouse, table, metricsReport);
reporter.reportMetric(warehouse, table, metricsReport, receivedTimestamp);

verify(mockConsumer).accept(warehouse, table, metricsReport);
verify(mockConsumer).accept(warehouse, table, metricsReport, receivedTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ public String getAuthenticationScheme() {
catalogHandlerUtils,
externalCatalogFactory,
storageAccessConfigProvider,
new DefaultMetricsReporter());
new DefaultMetricsReporter(),
Clock.systemUTC());

// Optionally wrap with event delegator
IcebergRestCatalogApiService finalRestCatalogService = catalogService;
Expand Down
Loading