Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.metrics.iceberg;

import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.iceberg.metrics.CommitMetricsResult;
import org.apache.iceberg.metrics.CommitReport;
import org.apache.iceberg.metrics.CounterResult;
import org.apache.iceberg.metrics.ScanMetricsResult;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.metrics.TimerResult;
import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord;

/**
* Converts Iceberg metrics reports to SPI record types using a fluent builder API.
*
* <p>This converter extracts all relevant metrics from Iceberg's {@link ScanReport} and {@link
* CommitReport} and combines them with context information to create persistence-ready records.
*
* <p>Example usage:
*
* <pre>{@code
* ScanMetricsRecord record = MetricsRecordConverter.forScanReport(scanReport)
* .catalogId(catalog.getId())
* .tableId(tableEntity.getId())
* .build();
* }</pre>
*/
public final class MetricsRecordConverter {

private MetricsRecordConverter() {
// Utility class
}

/**
* Creates a builder for converting a ScanReport to a ScanMetricsRecord.
*
* @param scanReport the Iceberg scan report
* @return builder for configuring the conversion
*/
public static ScanReportBuilder forScanReport(ScanReport scanReport) {
return new ScanReportBuilder(scanReport);
}

/**
* Creates a builder for converting a CommitReport to a CommitMetricsRecord.
*
* @param commitReport the Iceberg commit report
* @return builder for configuring the conversion
*/
public static CommitReportBuilder forCommitReport(CommitReport commitReport) {
return new CommitReportBuilder(commitReport);
}

/** Builder for converting ScanReport to ScanMetricsRecord. */
public static final class ScanReportBuilder {
private final ScanReport scanReport;
private long catalogId;
private long tableId;
private Instant timestamp;

private ScanReportBuilder(ScanReport scanReport) {
this.scanReport = scanReport;
}

public ScanReportBuilder catalogId(long catalogId) {
this.catalogId = catalogId;
return this;
}

/**
* Sets the table entity ID.
*
* <p>This is the internal Polaris entity ID for the table.
*
* @param tableId the table entity ID
* @return this builder
*/
public ScanReportBuilder tableId(long tableId) {
this.tableId = tableId;
return this;
}

/**
* Sets the timestamp for the metrics record.
*
* <p>This should be the time the metrics report was received by the server, which may differ
* from the time it was recorded by the client.
*
* @param timestamp the timestamp
* @return this builder
*/
public ScanReportBuilder timestamp(Instant timestamp) {
this.timestamp = timestamp;
return this;
}

public ScanMetricsRecord build() {
ScanMetricsResult metrics = scanReport.scanMetrics();
Map<String, String> reportMetadata =
scanReport.metadata() != null ? scanReport.metadata() : Collections.emptyMap();

return ScanMetricsRecord.builder()
.reportId(UUID.randomUUID().toString())
.catalogId(catalogId)
.tableId(tableId)
.timestamp(timestamp != null ? timestamp : Instant.now())
.snapshotId(Optional.of(scanReport.snapshotId()))
.schemaId(Optional.of(scanReport.schemaId()))
.filterExpression(
scanReport.filter() != null
? Optional.of(scanReport.filter().toString())
: Optional.empty())
.projectedFieldIds(
scanReport.projectedFieldIds() != null
? scanReport.projectedFieldIds()
: Collections.emptyList())
.projectedFieldNames(
scanReport.projectedFieldNames() != null
? scanReport.projectedFieldNames()
: Collections.emptyList())
Comment on lines +139 to +142
Copy link
Contributor

@singhpk234 singhpk234 Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: can we not make it nullable instead ?

.resultDataFiles(getCounterValue(metrics.resultDataFiles()))
.resultDeleteFiles(getCounterValue(metrics.resultDeleteFiles()))
.totalFileSizeBytes(getCounterValue(metrics.totalFileSizeInBytes()))
.totalDataManifests(getCounterValue(metrics.totalDataManifests()))
.totalDeleteManifests(getCounterValue(metrics.totalDeleteManifests()))
.scannedDataManifests(getCounterValue(metrics.scannedDataManifests()))
.scannedDeleteManifests(getCounterValue(metrics.scannedDeleteManifests()))
.skippedDataManifests(getCounterValue(metrics.skippedDataManifests()))
.skippedDeleteManifests(getCounterValue(metrics.skippedDeleteManifests()))
.skippedDataFiles(getCounterValue(metrics.skippedDataFiles()))
.skippedDeleteFiles(getCounterValue(metrics.skippedDeleteFiles()))
.totalPlanningDurationMs(getTimerValueMs(metrics.totalPlanningDuration()))
.equalityDeleteFiles(getCounterValue(metrics.equalityDeleteFiles()))
.positionalDeleteFiles(getCounterValue(metrics.positionalDeleteFiles()))
.indexedDeleteFiles(getCounterValue(metrics.indexedDeleteFiles()))
.totalDeleteFileSizeBytes(getCounterValue(metrics.totalDeleteFileSizeInBytes()))
.metadata(reportMetadata)
.build();
}
}

/** Builder for converting CommitReport to CommitMetricsRecord. */
public static final class CommitReportBuilder {
private final CommitReport commitReport;
private long catalogId;
private long tableId;
private Instant timestamp;

private CommitReportBuilder(CommitReport commitReport) {
this.commitReport = commitReport;
}

public CommitReportBuilder catalogId(long catalogId) {
this.catalogId = catalogId;
return this;
}

/**
* Sets the table entity ID.
*
* <p>This is the internal Polaris entity ID for the table.
*
* @param tableId the table entity ID
* @return this builder
*/
public CommitReportBuilder tableId(long tableId) {
this.tableId = tableId;
return this;
}

/**
* Sets the timestamp for the metrics record.
*
* <p>This should be the time the metrics report was received by the server, which may differ
* from the time it was recorded by the client.
*
* @param timestamp the timestamp
* @return this builder
*/
public CommitReportBuilder timestamp(Instant timestamp) {
this.timestamp = timestamp;
return this;
}

public CommitMetricsRecord build() {
CommitMetricsResult metrics = commitReport.commitMetrics();
Map<String, String> reportMetadata =
commitReport.metadata() != null ? commitReport.metadata() : Collections.emptyMap();

return CommitMetricsRecord.builder()
.reportId(UUID.randomUUID().toString())
.catalogId(catalogId)
.tableId(tableId)
.timestamp(timestamp != null ? timestamp : Instant.now())
.snapshotId(commitReport.snapshotId())
.sequenceNumber(Optional.of(commitReport.sequenceNumber()))
.operation(commitReport.operation())
.addedDataFiles(getCounterValue(metrics.addedDataFiles()))
.removedDataFiles(getCounterValue(metrics.removedDataFiles()))
.totalDataFiles(getCounterValue(metrics.totalDataFiles()))
.addedDeleteFiles(getCounterValue(metrics.addedDeleteFiles()))
.removedDeleteFiles(getCounterValue(metrics.removedDeleteFiles()))
.totalDeleteFiles(getCounterValue(metrics.totalDeleteFiles()))
.addedEqualityDeleteFiles(getCounterValue(metrics.addedEqualityDeleteFiles()))
.removedEqualityDeleteFiles(getCounterValue(metrics.removedEqualityDeleteFiles()))
.addedPositionalDeleteFiles(getCounterValue(metrics.addedPositionalDeleteFiles()))
.removedPositionalDeleteFiles(getCounterValue(metrics.removedPositionalDeleteFiles()))
.addedRecords(getCounterValue(metrics.addedRecords()))
.removedRecords(getCounterValue(metrics.removedRecords()))
.totalRecords(getCounterValue(metrics.totalRecords()))
.addedFileSizeBytes(getCounterValue(metrics.addedFilesSizeInBytes()))
.removedFileSizeBytes(getCounterValue(metrics.removedFilesSizeInBytes()))
.totalFileSizeBytes(getCounterValue(metrics.totalFilesSizeInBytes()))
.totalDurationMs(getTimerValueMsOpt(metrics.totalDuration()))
.attempts(getCounterValueInt(metrics.attempts()))
.metadata(reportMetadata)
.build();
}
}

// === Helper Methods ===

private static long getCounterValue(CounterResult counter) {
if (counter == null) {
return 0L;
}
return counter.value();
}

private static int getCounterValueInt(CounterResult counter) {
if (counter == null) {
return 0;
}
return (int) counter.value();
}

private static long getTimerValueMs(TimerResult timer) {
if (timer == null || timer.totalDuration() == null) {
return 0L;
}
return timer.totalDuration().toMillis();
}

private static Optional<Long> getTimerValueMsOpt(TimerResult timer) {
if (timer == null || timer.totalDuration() == null) {
return Optional.empty();
}
return Optional.of(timer.totalDuration().toMillis());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.persistence.metrics;

import com.google.common.annotations.Beta;
import java.util.Optional;
import org.apache.polaris.immutables.PolarisImmutable;

/**
* Backend-agnostic representation of an Iceberg commit metrics report.
*
* <p>This record captures all relevant metrics from an Iceberg {@code CommitReport} along with
* contextual information such as catalog identification and table location.
*
* <p>Common identification fields are inherited from {@link MetricsRecordIdentity}.
*
* <p>Note: Realm ID is not included in this record. Multi-tenancy realm context should be obtained
* from the CDI-injected {@code RealmContext} at persistence time.
*
* <p><b>Note:</b> This type is part of the experimental Metrics Persistence SPI and may change in
* future releases.
*/
@Beta
@PolarisImmutable
public interface CommitMetricsRecord extends MetricsRecordIdentity {

// === Commit Context ===

/** Snapshot ID created by this commit. */
long snapshotId();

/** Sequence number of the snapshot. */
Optional<Long> sequenceNumber();

/** Operation type (e.g., "append", "overwrite", "delete"). */
String operation();

// === File Metrics - Data Files ===

/** Number of data files added. */
long addedDataFiles();

/** Number of data files removed. */
long removedDataFiles();

/** Total number of data files after commit. */
long totalDataFiles();

// === File Metrics - Delete Files ===

/** Number of delete files added. */
long addedDeleteFiles();

/** Number of delete files removed. */
long removedDeleteFiles();

/** Total number of delete files after commit. */
long totalDeleteFiles();

/** Number of equality delete files added. */
long addedEqualityDeleteFiles();

/** Number of equality delete files removed. */
long removedEqualityDeleteFiles();

/** Number of positional delete files added. */
long addedPositionalDeleteFiles();

/** Number of positional delete files removed. */
long removedPositionalDeleteFiles();

// === Record Metrics ===

/** Number of records added. */
long addedRecords();

/** Number of records removed. */
long removedRecords();

/** Total number of records after commit. */
long totalRecords();

// === Size Metrics ===

/** Size of added files in bytes. */
long addedFileSizeBytes();

/** Size of removed files in bytes. */
long removedFileSizeBytes();

/** Total file size in bytes after commit. */
long totalFileSizeBytes();

// === Timing ===

/** Total duration of the commit in milliseconds. */
Optional<Long> totalDurationMs();

/** Number of commit attempts. */
int attempts();

/**
* Creates a new builder for CommitMetricsRecord.
*
* @return a new builder instance
*/
static ImmutableCommitMetricsRecord.Builder builder() {
return ImmutableCommitMetricsRecord.builder();
}
}
Loading