-
Notifications
You must be signed in to change notification settings - Fork 3k
[REST | SPARK]: Reference implementation of referenced-by in the loadTable call #13979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.catalog; | ||
|
|
||
| import java.util.Map; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.exceptions.NoSuchTableException; | ||
|
|
||
| /** | ||
| * Extension interface for Catalog that supports context-aware table loading. This enables passing | ||
| * additional context (such as view information) when loading tables. | ||
| */ | ||
| public interface ContextAwareTableCatalog { | ||
|
|
||
| /** | ||
| * Context key for the view identifier(s) that reference the table being loaded. | ||
| * | ||
| * <p>The value should be a List<TableIdentifier> representing the view(s) that reference | ||
| * the table. For nested views (a view referencing another view which references the table), the | ||
| * list should be ordered from outermost view to the view that directly references the table. | ||
| * | ||
| * <p>This structured format keeps namespace parts and view names separate throughout the catalog | ||
| * layer, only being serialized into the REST API format when making the actual HTTP request | ||
| * according to the spec: namespace parts joined by namespace separator (default 0x1F), followed | ||
| * by dot, followed by view name, with multiple views comma-separated. | ||
| */ | ||
| String VIEW_IDENTIFIER_KEY = "view.referenced-by"; | ||
|
|
||
| /** | ||
| * Load a table with additional context information. | ||
| * | ||
| * <p>Common context keys: | ||
| * | ||
| * <ul> | ||
| * <li>{@link #VIEW_IDENTIFIER_KEY}: A List<TableIdentifier> of view(s) referencing this | ||
| * table | ||
| * </ul> | ||
| * | ||
| * @param identifier the table identifier to load | ||
| * @param loadingContext additional context information as key-value pairs | ||
| * @return the loaded table | ||
| * @throws NoSuchTableException if the table does not exist | ||
| */ | ||
| Table loadTable(TableIdentifier identifier, Map<String, Object> loadingContext) | ||
| throws NoSuchTableException; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the referencing view doesn't exist? Shouldn't we check for the referencing view and throw NoSuchViewException, or should we silently omit if the view doesn't exist?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the concern that its a public API, that we should validate this ? My intention was to treat this as a runtime property, I renamed this API to loadTableWithContext, please let me know if this helps addressing this concern. |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -361,4 +361,9 @@ default boolean namespaceExists(SessionContext context, Namespace namespace) { | |
| return false; | ||
| } | ||
| } | ||
|
|
||
| default Table loadTableWithContext( | ||
| SessionContext sessionContext, TableIdentifier ident, Map<String, Object> context) { | ||
| return loadTable(sessionContext, ident); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you considered throwing UnsupportedOperationException instead of silently ignore the context param? |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,7 +62,7 @@ public <T> T withContext(SessionContext context, Function<Catalog, T> task) { | |
| return task.apply(asCatalog(context)); | ||
| } | ||
|
|
||
| public class AsCatalog implements Catalog, SupportsNamespaces { | ||
| public class AsCatalog implements Catalog, ContextAwareTableCatalog, SupportsNamespaces { | ||
| private final SessionContext context; | ||
|
|
||
| private AsCatalog(SessionContext context) { | ||
|
|
@@ -159,5 +159,10 @@ public boolean removeProperties(Namespace namespace, Set<String> removals) { | |
| public boolean namespaceExists(Namespace namespace) { | ||
| return BaseSessionCatalog.this.namespaceExists(context, namespace); | ||
| } | ||
|
|
||
| @Override | ||
| public Table loadTable(TableIdentifier identifier, Map<String, Object> loadingContext) { | ||
| return BaseSessionCatalog.this.loadTableWithContext(context, identifier, loadingContext); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe consider renaming |
||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.Transaction; | ||
| import org.apache.iceberg.catalog.Catalog; | ||
| import org.apache.iceberg.catalog.ContextAwareTableCatalog; | ||
| import org.apache.iceberg.catalog.Namespace; | ||
| import org.apache.iceberg.catalog.SessionCatalog; | ||
| import org.apache.iceberg.catalog.SupportsNamespaces; | ||
|
|
@@ -45,7 +46,12 @@ | |
| import org.apache.iceberg.view.ViewBuilder; | ||
|
|
||
| public class RESTCatalog | ||
| implements Catalog, ViewCatalog, SupportsNamespaces, Configurable<Object>, Closeable { | ||
| implements Catalog, | ||
| ContextAwareTableCatalog, | ||
| ViewCatalog, | ||
| SupportsNamespaces, | ||
| Configurable<Object>, | ||
| Closeable { | ||
| private final RESTSessionCatalog sessionCatalog; | ||
| private final Catalog delegate; | ||
| private final SupportsNamespaces nsDelegate; | ||
|
|
@@ -120,6 +126,15 @@ public Table loadTable(TableIdentifier ident) { | |
| return delegate.loadTable(ident); | ||
| } | ||
|
|
||
| @Override | ||
| public Table loadTable(TableIdentifier identifier, Map<String, Object> loadingContext) { | ||
| if (delegate instanceof ContextAwareTableCatalog) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I see the concept in this class is rather to have a new member of type Would this also work here? Do I miss something? |
||
| ContextAwareTableCatalog catalog = (ContextAwareTableCatalog) delegate; | ||
| return catalog.loadTable(identifier, loadingContext); | ||
| } | ||
| return loadTable(identifier); | ||
| } | ||
|
|
||
| @Override | ||
| public void invalidateTable(TableIdentifier ident) { | ||
| delegate.invalidateTable(ident); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,6 +46,7 @@ | |
| import org.apache.iceberg.Transactions; | ||
| import org.apache.iceberg.catalog.BaseViewSessionCatalog; | ||
| import org.apache.iceberg.catalog.Catalog; | ||
| import org.apache.iceberg.catalog.ContextAwareTableCatalog; | ||
| import org.apache.iceberg.catalog.Namespace; | ||
| import org.apache.iceberg.catalog.TableCommit; | ||
| import org.apache.iceberg.catalog.TableIdentifier; | ||
|
|
@@ -396,46 +397,96 @@ private static Map<String, String> snapshotModeToParam(SnapshotMode mode) { | |
| } | ||
|
|
||
| private LoadTableResponse loadInternal( | ||
| SessionContext context, TableIdentifier identifier, SnapshotMode mode) { | ||
| SessionContext context, | ||
| TableIdentifier identifier, | ||
| SnapshotMode mode, | ||
| Map<String, Object> viewContext) { | ||
| Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE); | ||
| AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); | ||
|
|
||
| return client | ||
| .withAuthSession(contextualSession) | ||
| .get( | ||
| paths.table(identifier), | ||
| snapshotModeToParam(mode), | ||
| referencedByToQueryParam(snapshotModeToParam(mode), viewContext), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: here at the calcite the reader could have the expression that the order of |
||
| LoadTableResponse.class, | ||
| Map.of(), | ||
| ErrorHandlers.tableErrorHandler()); | ||
| } | ||
|
|
||
| private Map<String, String> referencedByToQueryParam( | ||
| Map<String, String> params, Map<String, Object> context) { | ||
| if (context.isEmpty() || !context.containsKey(ContextAwareTableCatalog.VIEW_IDENTIFIER_KEY)) { | ||
| return params; | ||
| } | ||
|
|
||
| Map<String, String> queryParams = Maps.newHashMap(params); | ||
| Object viewIdentifierObj = context.get(ContextAwareTableCatalog.VIEW_IDENTIFIER_KEY); | ||
|
|
||
| if (!(viewIdentifierObj instanceof List)) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe |
||
| throw new IllegalStateException( | ||
| "Invalid view identifier in context, expected List<TableIdentifier>: " | ||
| + viewIdentifierObj); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| List<TableIdentifier> viewIdentifiers = (List<TableIdentifier>) viewIdentifierObj; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we include a check into the above that not just check for List type but for List? Wouldn't this result in a ClassCastException or something similar if we input List that contains anything other than TableIdentifier? |
||
|
|
||
| if (viewIdentifiers.isEmpty()) { | ||
| return params; | ||
| } | ||
|
|
||
| // Format per REST API spec: | ||
| // - Namespace parts joined by namespace separator (0x1F, URL-encoded as %1F) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. separator is configurable now, the comment now is invalid |
||
| // - Namespace and view name separated by dot (.) | ||
| // - Multiple views separated by comma (,) | ||
| // - Commas in view names must be percent-encoded as %2C | ||
| List<String> formattedViews = Lists.newArrayListWithCapacity(viewIdentifiers.size()); | ||
| for (TableIdentifier viewId : viewIdentifiers) { | ||
| // Encode namespace using RESTUtil which handles the namespace separator | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I don't think these comments in the for loop are necessary. The code seems pretty straightforward and the longer comment above the loop also helps |
||
| String encodedNamespace = RESTUtil.encodeNamespace(viewId.namespace()); | ||
|
|
||
| // Encode view name - commas must be encoded as %2C | ||
| String encodedViewName = RESTUtil.encodeString(viewId.name()); | ||
|
|
||
| // Combine: encodedNamespace + "." + encodedViewName | ||
| // Note: RESTUtil.encodeString already encodes the dot if present in the view name, | ||
| // but we add an unencoded dot as the separator | ||
| formattedViews.add(encodedNamespace + "." + encodedViewName); | ||
| } | ||
|
|
||
| // Join multiple views with comma | ||
| queryParams.put("referenced-by", String.join(",", formattedViews)); | ||
|
|
||
| return queryParams; | ||
| } | ||
|
|
||
| @Override | ||
| public Table loadTable(SessionContext context, TableIdentifier identifier) { | ||
| public Table loadTableWithContext( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this had been discussed already, but can't this be simply |
||
| SessionContext sessionContext, TableIdentifier identifier, Map<String, Object> context) { | ||
| Endpoint.check( | ||
| endpoints, | ||
| Endpoint.V1_LOAD_TABLE, | ||
| () -> | ||
| new NoSuchTableException( | ||
| "Unable to load table %s.%s: Server does not support endpoint %s", | ||
| name(), identifier, Endpoint.V1_LOAD_TABLE)); | ||
|
|
||
| checkIdentifierIsValid(identifier); | ||
|
|
||
| MetadataTableType metadataType; | ||
| LoadTableResponse response; | ||
| TableIdentifier loadedIdent; | ||
| TableIdentifier loadedIdent = identifier; | ||
|
|
||
| try { | ||
| response = loadInternal(context, identifier, snapshotMode); | ||
| loadedIdent = identifier; | ||
| response = loadInternal(sessionContext, identifier, snapshotMode, context); | ||
| metadataType = null; | ||
|
|
||
| } catch (NoSuchTableException original) { | ||
| metadataType = MetadataTableType.from(identifier.name()); | ||
| metadataType = MetadataTableType.from(loadedIdent.name()); | ||
| if (metadataType != null) { | ||
| // attempt to load a metadata table using the identifier's namespace as the base table | ||
| TableIdentifier baseIdent = TableIdentifier.of(identifier.namespace().levels()); | ||
| TableIdentifier baseIdent = TableIdentifier.of(loadedIdent.namespace().levels()); | ||
| try { | ||
| response = loadInternal(context, baseIdent, snapshotMode); | ||
| response = loadInternal(sessionContext, baseIdent, snapshotMode, context); | ||
| loadedIdent = baseIdent; | ||
| } catch (NoSuchTableException ignored) { | ||
| // the base table does not exist | ||
|
|
@@ -449,7 +500,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { | |
|
|
||
| TableIdentifier finalIdentifier = loadedIdent; | ||
| Map<String, String> tableConf = response.config(); | ||
| AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); | ||
| AuthSession contextualSession = authManager.contextualSession(sessionContext, catalogAuth); | ||
| AuthSession tableSession = | ||
| authManager.tableSession(finalIdentifier, tableConf, contextualSession); | ||
| TableMetadata tableMetadata; | ||
|
|
@@ -461,7 +512,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { | |
| .setPreviousFileLocation(null) | ||
| .setSnapshotsSupplier( | ||
| () -> | ||
| loadInternal(context, finalIdentifier, SnapshotMode.ALL) | ||
| loadInternal(sessionContext, finalIdentifier, SnapshotMode.ALL, context) | ||
| .tableMetadata() | ||
| .snapshots()) | ||
| .discardChanges() | ||
|
|
@@ -477,7 +528,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { | |
| paths.table(finalIdentifier), | ||
| Map::of, | ||
| mutationHeaders, | ||
| tableFileIO(context, tableConf, response.credentials()), | ||
| tableFileIO(sessionContext, tableConf, response.credentials()), | ||
| tableMetadata, | ||
| endpoints); | ||
|
|
||
|
|
@@ -521,6 +572,11 @@ private RESTTable restTableForScanPlanning( | |
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public Table loadTable(SessionContext context, TableIdentifier identifier) { | ||
| return loadTableWithContext(context, identifier, Map.of()); | ||
| } | ||
|
|
||
| private void trackFileIO(RESTTableOperations ops) { | ||
| if (io != ops.io()) { | ||
| fileIOTracker.track(ops); | ||
|
|
@@ -904,7 +960,7 @@ public Transaction replaceTransaction() { | |
| throw new AlreadyExistsException("View with same name already exists: %s", ident); | ||
| } | ||
|
|
||
| LoadTableResponse response = loadInternal(context, ident, snapshotMode); | ||
| LoadTableResponse response = loadInternal(context, ident, snapshotMode, Map.of()); | ||
| String fullName = fullTableName(ident); | ||
|
|
||
| Map<String, String> tableConf = response.config(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to escape the lt and gt chars?
"List<TableIdentifier>"