relational-jdbc: add DB-agnostic idempotency store + model#3584
relational-jdbc: add DB-agnostic idempotency store + model#3584huaxingao wants to merge 8 commits intoapache:mainfrom
Conversation
|
cc @singhpk234 @flyrain Could you please take a look when you have a moment? |
...g/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java
Outdated
Show resolved
Hide resolved
...olaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java
Show resolved
Hide resolved
...g/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java
Outdated
Show resolved
Hide resolved
...g/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java
Show resolved
Hide resolved
| QueryGenerator.PreparedQuery update = | ||
| new QueryGenerator.PreparedQuery( | ||
| sql, | ||
| List.of(Timestamp.from(now), Timestamp.from(now), realmId, idempotencyKey, executorId)); |
There was a problem hiding this comment.
see insteadt of using Timestamp.now() we can use the injected Clock ?
There was a problem hiding this comment.
We’re not using Timestamp.now() here — the store takes Instant now as a parameter and only converts via Timestamp.from(now)
| if (datasourceOperations.isConstraintViolation(e)) { | ||
| return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey)); | ||
| } | ||
| throw new RuntimeException("Failed to reserve idempotency key", e); |
There was a problem hiding this comment.
can we have a defined exception ? what error code we wanna throw in case for the request ?
There was a problem hiding this comment.
Added IdempotencyPersistenceException
| * Normalized identifier of the resource affected by the operation. | ||
| * | ||
| * <p>This should be derived from the request (for example, a canonicalized path like {@code | ||
| * "tables/ns.tbl"}), not from a generated internal entity id. This ensures the binding is |
There was a problem hiding this comment.
how do we protect against drop + create and renames ?
There was a problem hiding this comment.
Do we define canonicalized path somewhere? Should we avoid duplication for a canonicalized path? A path like this tables/ns.tbl is very likely causing conflicts, as different catalogs could have the same namespace and table names.
| new PostgreSQLContainer<>("postgres:17.5-alpine"); | ||
| new PostgreSQLContainer<>( | ||
| containerSpecHelper("postgres", PostgresRelationalJdbcLifeCycleManagement.class) | ||
| .dockerImageName(null) |
There was a problem hiding this comment.
why null ? can we do lastest tag ?
There was a problem hiding this comment.
null is intentional here: containerSpecHelper(...).dockerImageName(null) means “use the pinned default from Dockerfile-postgres-version and allow overrides via system props/env.
| public static String fullyQualifiedTableName(@Nonnull String tableName) { | ||
| return getFullyQualifiedTableName(tableName); | ||
| } |
There was a problem hiding this comment.
I am confused, why create a new public function ? can we not have private function itself made public ?
There was a problem hiding this comment.
Right, we don't need this. removed.
| ModelIdempotencyRecord CONVERTER = | ||
| ImmutableModelIdempotencyRecord.builder() | ||
| .realmId("") | ||
| .idempotencyKey("") | ||
| .operationType("") | ||
| .resourceId("") | ||
| .createdAt(Instant.EPOCH) | ||
| .updatedAt(Instant.EPOCH) | ||
| .expiresAt(Instant.EPOCH) | ||
| .build(); |
There was a problem hiding this comment.
I didn't fully get the rationale of this
orthognally if its absolutely required lets make it static ?
There was a problem hiding this comment.
I’ve refactored this to avoid the dummy instance by introducing a static fromRow(ResultSet) and a stateless CONVERTER singleton that delegates to it.
...tional-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
Show resolved
Hide resolved
| import org.apache.polaris.persistence.relational.jdbc.models.ImmutableModelIdempotencyRecord; | ||
| import org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord; | ||
|
|
||
| public class RelationalJdbcIdempotencyStore implements IdempotencyStore { |
There was a problem hiding this comment.
How is this obj created ? How is core gonna use it
There was a problem hiding this comment.
RelationalJdbcIdempotencyStore is just the relational-jdbc backend for the IdempotencyStore interface. It’s constructed with a DataSource + RelationalJdbcConfiguration and then the request/idempotency layer calls reserve/load/updateHeartbeat/finalize/purgeExpired through the IdempotencyStore SPI.
| String REALM_ID = "realm_id"; | ||
| String IDEMPOTENCY_KEY = "idempotency_key"; | ||
| String OPERATION_TYPE = "operation_type"; | ||
| String RESOURCE_ID = "resource_id"; | ||
|
|
||
| String HTTP_STATUS = "http_status"; | ||
| String ERROR_SUBTYPE = "error_subtype"; | ||
| String RESPONSE_SUMMARY = "response_summary"; | ||
| String RESPONSE_HEADERS = "response_headers"; | ||
| String FINALIZED_AT = "finalized_at"; | ||
|
|
||
| String CREATED_AT = "created_at"; | ||
| String UPDATED_AT = "updated_at"; | ||
| String HEARTBEAT_AT = "heartbeat_at"; | ||
| String EXECUTOR_ID = "executor_id"; | ||
| String EXPIRES_AT = "expires_at"; |
There was a problem hiding this comment.
we should add code comments for each of the fields like we do in the rest of the model
| Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey); | ||
| if (existing.isEmpty()) { | ||
| return HeartbeatResult.NOT_FOUND; | ||
| } | ||
|
|
||
| IdempotencyRecord record = existing.get(); | ||
| if (record.getHttpStatus() != null) { | ||
| return HeartbeatResult.FINALIZED; | ||
| } | ||
| if (record.getExecutorId() == null || !record.getExecutorId().equals(executorId)) { | ||
| return HeartbeatResult.LOST_OWNERSHIP; | ||
| } | ||
|
|
||
| QueryGenerator.PreparedQuery update = | ||
| QueryGenerator.generateUpdateQuery( | ||
| ModelIdempotencyRecord.SELECT_COLUMNS, | ||
| ModelIdempotencyRecord.TABLE_NAME, | ||
| Map.of( | ||
| ModelIdempotencyRecord.HEARTBEAT_AT, | ||
| Timestamp.from(now), | ||
| ModelIdempotencyRecord.UPDATED_AT, | ||
| Timestamp.from(now)), | ||
| Map.of( | ||
| ModelIdempotencyRecord.REALM_ID, | ||
| realmId, | ||
| ModelIdempotencyRecord.IDEMPOTENCY_KEY, | ||
| idempotencyKey, | ||
| ModelIdempotencyRecord.EXECUTOR_ID, | ||
| executorId), | ||
| Map.of(), | ||
| Map.of(), | ||
| Set.of(ModelIdempotencyRecord.HTTP_STATUS), | ||
| Set.of()); | ||
|
|
||
| try { | ||
| int updated = datasourceOperations.executeUpdate(update); | ||
| if (updated > 0) { | ||
| return HeartbeatResult.UPDATED; | ||
| } | ||
| } catch (SQLException e) { | ||
| throw new RuntimeException("Failed to update idempotency heartbeat", e); | ||
| } | ||
|
|
||
| // Raced with finalize/ownership loss; re-check to return a meaningful result. | ||
| Optional<IdempotencyRecord> after = load(realmId, idempotencyKey); | ||
| if (after.isEmpty()) { | ||
| return HeartbeatResult.NOT_FOUND; | ||
| } | ||
| if (after.get().getHttpStatus() != null) { | ||
| return HeartbeatResult.FINALIZED; | ||
| } | ||
| return HeartbeatResult.LOST_OWNERSHIP; | ||
| } |
There was a problem hiding this comment.
would it not need to be in a transaction ?
There was a problem hiding this comment.
The actual heartbeat change is done via a single atomic UPDATE ... WHERE realm/key/executor AND http_status IS NULL, so it doesn’t require an explicit transaction for correctness.
| containerSpecHelper("postgres", PostgresRelationalJdbcLifeCycleManagement.class) | ||
| .dockerImageName(null) | ||
| .asCompatibleSubstituteFor("postgres")) | ||
| .withDatabaseName("polaris_db") |
There was a problem hiding this comment.
we set the foloowing in setup too, why we need here again ?
|
These were final set of comments :), can't think of anymore, thanks @huaxingao for working on it and driving the whole idempotency effort ! |
flyrain
left a comment
There was a problem hiding this comment.
Thanks @huaxingao for the PR. LGTM overall. Left some comments.
| * Normalized identifier of the resource affected by the operation. | ||
| * | ||
| * <p>This should be derived from the request (for example, a canonicalized path like {@code | ||
| * "tables/ns.tbl"}), not from a generated internal entity id. This ensures the binding is |
There was a problem hiding this comment.
Do we define canonicalized path somewhere? Should we avoid duplication for a canonicalized path? A path like this tables/ns.tbl is very likely causing conflicts, as different catalogs could have the same namespace and table names.
| * <p>This follows the same pattern as {@link ModelEvent}, separating the storage representation | ||
| * from the core domain model while still providing {@link Converter} helpers. | ||
| */ | ||
| @PolarisImmutable |
There was a problem hiding this comment.
Do we need it to be PolarisImmutable? We didn't do that for ModelEntity, and a few others.
There was a problem hiding this comment.
I removed @PolarisImmutable as suggested. That required a small follow-up refactor in RelationalJdbcIdempotencyStore.reserve(): we no longer build an ImmutableModelIdempotencyRecord just to get the insert bindings; instead we construct the insert map/values directly (same columns/values, still uses QueryGenerator.generateInsertQuery). This keeps behavior the same while avoiding the Immutables-generated type.
| } | ||
|
|
||
| /** Returns the detected database type for this datasource. */ | ||
| public DatabaseType databaseType() { |
There was a problem hiding this comment.
Do we need this new method given that we've got a getDatabaseType() already(line 82)? We could change the scope to public if needed.
|
|
||
| // Logical tenant / realm identifier. | ||
| String REALM_ID = "realm_id"; | ||
| // Client-provided idempotency key. |
There was a problem hiding this comment.
Nit: We got so many nice comments here while there is no comment on the class IdempotencyRecord. I'd love to have these comments on the class IdempotencyRecord.
There was a problem hiding this comment.
Added more comments. Thanks!
| * <p>Callers should prefer passing an ordered map (e.g. {@link java.util.LinkedHashMap}) for the | ||
| * set clause so generated SQL and parameter order are stable. |
There was a problem hiding this comment.
nit: feel a bit easier to understand to use consistent, or matched. Just my personal preference. Feel free to ignore it.
| * <p>Callers should prefer passing an ordered map (e.g. {@link java.util.LinkedHashMap}) for the | |
| * set clause so generated SQL and parameter order are stable. | |
| * <p>Callers should prefer passing an ordered map (e.g. {@link java.util.LinkedHashMap}) for the | |
| * set clause so generated SQL and parameter order are consistent. |
There was a problem hiding this comment.
Fixed. Thanks for the suggestion!
| validateColumns(columns, whereEquals.keySet()); | ||
| validateColumns(columns, whereGreater.keySet()); | ||
| validateColumns(columns, whereLess.keySet()); | ||
| validateColumns(columns, whereIsNull); | ||
| validateColumns(columns, whereIsNotNull); |
There was a problem hiding this comment.
Do we need to validate them here as the method generateWhereClauseExtended() will validates them anyways?
There was a problem hiding this comment.
You are right. Removed.
| validateColumns(columns, setClause.keySet()); | ||
| validateColumns(columns, whereEquals.keySet()); | ||
| validateColumns(columns, whereGreater.keySet()); | ||
| validateColumns(columns, whereLess.keySet()); | ||
| validateColumns(columns, whereIsNull); | ||
| validateColumns(columns, whereIsNotNull); |
| // Preserve the original behavior of rejecting unknown columns. This is used by SELECT query | ||
| // generation too, not only by callers of the extended UPDATE/DELETE helpers. |
There was a problem hiding this comment.
Nit: I guess this comment is more like a dev log. Do we need them as a comment?
| @Nonnull Set<String> tableColumns, @Nonnull Set<String> columns) { | ||
| for (String column : columns) { | ||
| if (!tableColumns.contains(column) && !column.equals("realm_id")) { | ||
| throw new IllegalArgumentException("Invalid query column: " + column); |
There was a problem hiding this comment.
realm_id is treated as a special implicit column for some models but explicitly included in SELECT_COLUMNS for ModelIdempotencyRecord. It might be cleaner to do something similar to other model, which keep realm_id out by only using ALL_COLUMNS instead of having SELECT_COLUMNS. WDYT?
There was a problem hiding this comment.
Makes sense. Fixed.
#3205 followup:
ModelIdempotencyRecordRelationalJdbcIdempotencyStoreimplementingIdempotencyStore.IdempotencyRecordtopolaris-coreunderorg.apache.polaris.core.entityand updatingIdempotencyStoreimports accordingly.PostgresIdempotencyStoreITwithRelationalJdbcIdempotencyStorePostgresITChecklist
CHANGELOG.md(if needed)site/content/in-dev/unreleased(if needed)