-
Notifications
You must be signed in to change notification settings - Fork 366
feat(metrics): Add MetricsPersistence SPI for backend-agnostic metrics storage (#3337) #3616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
obelix74
wants to merge
11
commits into
apache:main
Choose a base branch
from
obelix74:feat-3337-metrics-persistence-spi
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,172
−0
Open
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
af07d42
feat(metrics): Add MetricsPersistence SPI for backend-agnostic metric…
bceb136
refactor(metrics): Remove ambient context fields from SPI records
01d8e93
refactor(metrics): Replace offset-based pagination with PageToken pat…
59c907f
Review comments
9ec22ea
Review comments
b66a0d6
refactor: Remove TableIdentifier and catalogName from SPI records
7ac814e
refactor: Use List<String> for namespace instead of dot-separated string
4a3bc9f
refactor: Use tableId instead of tableName in metrics records
7d4212c
refactor: Remove namespace from MetricsQueryCriteria
53a00cd
feat: mark Metrics Persistence SPI as @Beta (experimental)
50faca3
feat: Add timestamp() method to MetricsRecordConverter builders
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
300 changes: 300 additions & 0 deletions
300
...is-core/src/main/java/org/apache/polaris/core/metrics/iceberg/MetricsRecordConverter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,300 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.core.metrics.iceberg; | ||
|
|
||
| import java.time.Instant; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.UUID; | ||
| import org.apache.iceberg.metrics.CommitMetricsResult; | ||
| import org.apache.iceberg.metrics.CommitReport; | ||
| import org.apache.iceberg.metrics.CounterResult; | ||
| import org.apache.iceberg.metrics.ScanMetricsResult; | ||
| import org.apache.iceberg.metrics.ScanReport; | ||
| import org.apache.iceberg.metrics.TimerResult; | ||
| import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord; | ||
| import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord; | ||
|
|
||
| /** | ||
| * Converts Iceberg metrics reports to SPI record types using a fluent builder API. | ||
| * | ||
| * <p>This converter extracts all relevant metrics from Iceberg's {@link ScanReport} and {@link | ||
| * CommitReport} and combines them with context information to create persistence-ready records. | ||
| * | ||
| * <p>Example usage: | ||
| * | ||
| * <pre>{@code | ||
| * ScanMetricsRecord record = MetricsRecordConverter.forScanReport(scanReport) | ||
| * .catalogId(catalog.getId()) | ||
| * .tableId(tableEntity.getId()) | ||
| * .namespace(namespace) | ||
| * .build(); | ||
| * }</pre> | ||
| */ | ||
| public final class MetricsRecordConverter { | ||
|
|
||
| private MetricsRecordConverter() { | ||
| // Utility class | ||
| } | ||
|
|
||
| /** | ||
| * Creates a builder for converting a ScanReport to a ScanMetricsRecord. | ||
| * | ||
| * @param scanReport the Iceberg scan report | ||
| * @return builder for configuring the conversion | ||
| */ | ||
| public static ScanReportBuilder forScanReport(ScanReport scanReport) { | ||
| return new ScanReportBuilder(scanReport); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a builder for converting a CommitReport to a CommitMetricsRecord. | ||
| * | ||
| * @param commitReport the Iceberg commit report | ||
| * @return builder for configuring the conversion | ||
| */ | ||
| public static CommitReportBuilder forCommitReport(CommitReport commitReport) { | ||
| return new CommitReportBuilder(commitReport); | ||
| } | ||
|
|
||
| /** Builder for converting ScanReport to ScanMetricsRecord. */ | ||
| public static final class ScanReportBuilder { | ||
| private final ScanReport scanReport; | ||
| private long catalogId; | ||
| private long tableId; | ||
| private List<String> namespace = Collections.emptyList(); | ||
| private Instant timestamp; | ||
|
|
||
| private ScanReportBuilder(ScanReport scanReport) { | ||
| this.scanReport = scanReport; | ||
| } | ||
|
|
||
| public ScanReportBuilder catalogId(long catalogId) { | ||
| this.catalogId = catalogId; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the table entity ID. | ||
| * | ||
| * <p>This is the internal Polaris entity ID for the table. | ||
| * | ||
| * @param tableId the table entity ID | ||
| * @return this builder | ||
| */ | ||
| public ScanReportBuilder tableId(long tableId) { | ||
| this.tableId = tableId; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the namespace as a list of levels. | ||
| * | ||
| * @param namespace the namespace levels | ||
| * @return this builder | ||
| */ | ||
| public ScanReportBuilder namespace(List<String> namespace) { | ||
| this.namespace = namespace != null ? namespace : Collections.emptyList(); | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the timestamp for the metrics record. | ||
| * | ||
| * <p>This should be the time the metrics report was received by the server, which may differ | ||
| * from the time it was recorded by the client. | ||
| * | ||
| * @param timestamp the timestamp | ||
| * @return this builder | ||
| */ | ||
| public ScanReportBuilder timestamp(Instant timestamp) { | ||
| this.timestamp = timestamp; | ||
obelix74 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return this; | ||
| } | ||
|
|
||
| public ScanMetricsRecord build() { | ||
| ScanMetricsResult metrics = scanReport.scanMetrics(); | ||
| Map<String, String> reportMetadata = | ||
| scanReport.metadata() != null ? scanReport.metadata() : Collections.emptyMap(); | ||
|
|
||
| return ScanMetricsRecord.builder() | ||
| .reportId(UUID.randomUUID().toString()) | ||
| .catalogId(catalogId) | ||
| .namespace(namespace) | ||
| .tableId(tableId) | ||
| .timestamp(timestamp != null ? timestamp : Instant.now()) | ||
| .snapshotId(Optional.of(scanReport.snapshotId())) | ||
| .schemaId(Optional.of(scanReport.schemaId())) | ||
| .filterExpression( | ||
| scanReport.filter() != null | ||
| ? Optional.of(scanReport.filter().toString()) | ||
| : Optional.empty()) | ||
| .projectedFieldIds( | ||
| scanReport.projectedFieldIds() != null | ||
| ? scanReport.projectedFieldIds() | ||
| : Collections.emptyList()) | ||
| .projectedFieldNames( | ||
| scanReport.projectedFieldNames() != null | ||
| ? scanReport.projectedFieldNames() | ||
| : Collections.emptyList()) | ||
|
Comment on lines
+154
to
+157
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: can we not make it nullable instead ? |
||
| .resultDataFiles(getCounterValue(metrics.resultDataFiles())) | ||
| .resultDeleteFiles(getCounterValue(metrics.resultDeleteFiles())) | ||
| .totalFileSizeBytes(getCounterValue(metrics.totalFileSizeInBytes())) | ||
| .totalDataManifests(getCounterValue(metrics.totalDataManifests())) | ||
| .totalDeleteManifests(getCounterValue(metrics.totalDeleteManifests())) | ||
| .scannedDataManifests(getCounterValue(metrics.scannedDataManifests())) | ||
| .scannedDeleteManifests(getCounterValue(metrics.scannedDeleteManifests())) | ||
| .skippedDataManifests(getCounterValue(metrics.skippedDataManifests())) | ||
| .skippedDeleteManifests(getCounterValue(metrics.skippedDeleteManifests())) | ||
| .skippedDataFiles(getCounterValue(metrics.skippedDataFiles())) | ||
| .skippedDeleteFiles(getCounterValue(metrics.skippedDeleteFiles())) | ||
| .totalPlanningDurationMs(getTimerValueMs(metrics.totalPlanningDuration())) | ||
| .equalityDeleteFiles(getCounterValue(metrics.equalityDeleteFiles())) | ||
| .positionalDeleteFiles(getCounterValue(metrics.positionalDeleteFiles())) | ||
| .indexedDeleteFiles(getCounterValue(metrics.indexedDeleteFiles())) | ||
| .totalDeleteFileSizeBytes(getCounterValue(metrics.totalDeleteFileSizeInBytes())) | ||
| .metadata(reportMetadata) | ||
| .build(); | ||
| } | ||
| } | ||
|
|
||
| /** Builder for converting CommitReport to CommitMetricsRecord. */ | ||
| public static final class CommitReportBuilder { | ||
| private final CommitReport commitReport; | ||
| private long catalogId; | ||
| private long tableId; | ||
| private List<String> namespace = Collections.emptyList(); | ||
| private Instant timestamp; | ||
|
|
||
| private CommitReportBuilder(CommitReport commitReport) { | ||
| this.commitReport = commitReport; | ||
| } | ||
|
|
||
| public CommitReportBuilder catalogId(long catalogId) { | ||
| this.catalogId = catalogId; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the table entity ID. | ||
| * | ||
| * <p>This is the internal Polaris entity ID for the table. | ||
| * | ||
| * @param tableId the table entity ID | ||
| * @return this builder | ||
| */ | ||
| public CommitReportBuilder tableId(long tableId) { | ||
| this.tableId = tableId; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the namespace as a list of levels. | ||
| * | ||
| * @param namespace the namespace levels | ||
| * @return this builder | ||
| */ | ||
| public CommitReportBuilder namespace(List<String> namespace) { | ||
| this.namespace = namespace != null ? namespace : Collections.emptyList(); | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the timestamp for the metrics record. | ||
| * | ||
| * <p>This should be the time the metrics report was received by the server, which may differ | ||
| * from the time it was recorded by the client. | ||
| * | ||
| * @param timestamp the timestamp | ||
| * @return this builder | ||
| */ | ||
| public CommitReportBuilder timestamp(Instant timestamp) { | ||
| this.timestamp = timestamp; | ||
| return this; | ||
| } | ||
|
|
||
| public CommitMetricsRecord build() { | ||
| CommitMetricsResult metrics = commitReport.commitMetrics(); | ||
| Map<String, String> reportMetadata = | ||
| commitReport.metadata() != null ? commitReport.metadata() : Collections.emptyMap(); | ||
|
|
||
| return CommitMetricsRecord.builder() | ||
| .reportId(UUID.randomUUID().toString()) | ||
| .catalogId(catalogId) | ||
| .namespace(namespace) | ||
| .tableId(tableId) | ||
| .timestamp(timestamp != null ? timestamp : Instant.now()) | ||
| .snapshotId(commitReport.snapshotId()) | ||
| .sequenceNumber(Optional.of(commitReport.sequenceNumber())) | ||
| .operation(commitReport.operation()) | ||
| .addedDataFiles(getCounterValue(metrics.addedDataFiles())) | ||
| .removedDataFiles(getCounterValue(metrics.removedDataFiles())) | ||
| .totalDataFiles(getCounterValue(metrics.totalDataFiles())) | ||
| .addedDeleteFiles(getCounterValue(metrics.addedDeleteFiles())) | ||
| .removedDeleteFiles(getCounterValue(metrics.removedDeleteFiles())) | ||
| .totalDeleteFiles(getCounterValue(metrics.totalDeleteFiles())) | ||
| .addedEqualityDeleteFiles(getCounterValue(metrics.addedEqualityDeleteFiles())) | ||
| .removedEqualityDeleteFiles(getCounterValue(metrics.removedEqualityDeleteFiles())) | ||
| .addedPositionalDeleteFiles(getCounterValue(metrics.addedPositionalDeleteFiles())) | ||
| .removedPositionalDeleteFiles(getCounterValue(metrics.removedPositionalDeleteFiles())) | ||
| .addedRecords(getCounterValue(metrics.addedRecords())) | ||
| .removedRecords(getCounterValue(metrics.removedRecords())) | ||
| .totalRecords(getCounterValue(metrics.totalRecords())) | ||
| .addedFileSizeBytes(getCounterValue(metrics.addedFilesSizeInBytes())) | ||
| .removedFileSizeBytes(getCounterValue(metrics.removedFilesSizeInBytes())) | ||
| .totalFileSizeBytes(getCounterValue(metrics.totalFilesSizeInBytes())) | ||
| .totalDurationMs(getTimerValueMsOpt(metrics.totalDuration())) | ||
| .attempts(getCounterValueInt(metrics.attempts())) | ||
| .metadata(reportMetadata) | ||
| .build(); | ||
| } | ||
| } | ||
|
|
||
| // === Helper Methods === | ||
|
|
||
| private static long getCounterValue(CounterResult counter) { | ||
| if (counter == null) { | ||
| return 0L; | ||
| } | ||
| return counter.value(); | ||
| } | ||
|
|
||
| private static int getCounterValueInt(CounterResult counter) { | ||
| if (counter == null) { | ||
| return 0; | ||
| } | ||
| return (int) counter.value(); | ||
| } | ||
|
|
||
| private static long getTimerValueMs(TimerResult timer) { | ||
| if (timer == null || timer.totalDuration() == null) { | ||
| return 0L; | ||
| } | ||
| return timer.totalDuration().toMillis(); | ||
| } | ||
|
|
||
| private static Optional<Long> getTimerValueMsOpt(TimerResult timer) { | ||
| if (timer == null || timer.totalDuration() == null) { | ||
| return Optional.empty(); | ||
| } | ||
| return Optional.of(timer.totalDuration().toMillis()); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: why is namespace not an entity id ?