From 80aed3c0fbf2b21b0ce50effda0b5d78ab16c2ab Mon Sep 17 00:00:00 2001 From: Prashant Singh Date: Mon, 3 Nov 2025 04:06:05 -0800 Subject: [PATCH 1/2] Reference implementation of referenced-by in the loadTable call --- .../catalog/ContextAwareTableCatalog.java | 51 ++++++++++++++ .../iceberg/catalog/SessionCatalog.java | 5 ++ .../iceberg/catalog/BaseSessionCatalog.java | 7 +- .../org/apache/iceberg/rest/RESTCatalog.java | 17 ++++- .../iceberg/rest/RESTSessionCatalog.java | 68 +++++++++++++++---- .../sql/catalyst/analysis/ResolveViews.scala | 64 +++++++++++++---- .../views/UnResolvedRelationFromView.scala | 66 ++++++++++++++++++ .../spark/ContextAwareTableCatalog.java | 36 ++++++++++ .../apache/iceberg/spark/SparkCatalog.java | 43 ++++++++++-- .../iceberg/spark/SparkSessionCatalog.java | 57 +++++++++++++--- 10 files changed, 367 insertions(+), 47 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java create mode 100644 spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala create mode 100644 spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ContextAwareTableCatalog.java diff --git a/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java b/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java new file mode 100644 index 000000000000..22a194027995 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +import java.util.Map; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.NoSuchTableException; + +/** + * Extension interface for Catalog that supports context-aware table loading. This enables passing + * additional context (such as view information) when loading tables. + */ +public interface ContextAwareTableCatalog { + + /** Context key for the view identifier that references a table */ + String VIEW_IDENTIFIER_KEY = "view.referenced-by"; + + /** + * Load a table with additional context information. + * + *

Common context keys: + * + *

+ * + * @param identifier the table identifier to load + * @param loadingContext additional context information as key-value pairs + * @return the loaded table + * @throws NoSuchTableException if the table does not exist + */ + Table loadTable(TableIdentifier identifier, Map loadingContext) + throws NoSuchTableException; +} diff --git a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java index fe29f8918531..eca92b3d1a46 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java +++ b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java @@ -361,4 +361,9 @@ default boolean namespaceExists(SessionContext context, Namespace namespace) { return false; } } + + default Table loadTableWithContext( + SessionContext sessionContext, TableIdentifier ident, Map context) { + return loadTable(sessionContext, ident); + } } diff --git a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java index d6ee4d345cfa..9de0f18b9267 100644 --- a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java @@ -62,7 +62,7 @@ public T withContext(SessionContext context, Function task) { return task.apply(asCatalog(context)); } - public class AsCatalog implements Catalog, SupportsNamespaces { + public class AsCatalog implements Catalog, ContextAwareTableCatalog, SupportsNamespaces { private final SessionContext context; private AsCatalog(SessionContext context) { @@ -159,5 +159,10 @@ public boolean removeProperties(Namespace namespace, Set removals) { public boolean namespaceExists(Namespace namespace) { return BaseSessionCatalog.this.namespaceExists(context, namespace); } + + @Override + public Table loadTable(TableIdentifier identifier, Map loadingContext) { + return BaseSessionCatalog.this.loadTableWithContext(context, identifier, loadingContext); + } } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index aff8832c6bf4..6c9c75e3f2d7 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -30,6 +30,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.ContextAwareTableCatalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -45,7 +46,12 @@ import org.apache.iceberg.view.ViewBuilder; public class RESTCatalog - implements Catalog, ViewCatalog, SupportsNamespaces, Configurable, Closeable { + implements Catalog, + ContextAwareTableCatalog, + ViewCatalog, + SupportsNamespaces, + Configurable, + Closeable { private final RESTSessionCatalog sessionCatalog; private final Catalog delegate; private final SupportsNamespaces nsDelegate; @@ -120,6 +126,15 @@ public Table loadTable(TableIdentifier ident) { return delegate.loadTable(ident); } + @Override + public Table loadTable(TableIdentifier identifier, Map loadingContext) { + if (delegate instanceof ContextAwareTableCatalog) { + ContextAwareTableCatalog catalog = (ContextAwareTableCatalog) delegate; + return catalog.loadTable(identifier, loadingContext); + } + return loadTable(identifier); + } + @Override public void invalidateTable(TableIdentifier ident) { delegate.invalidateTable(ident); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 61e25d3d4fc6..ee4e51a6ab1e 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; @@ -46,6 +47,7 @@ import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.BaseViewSessionCatalog; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.ContextAwareTableCatalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; @@ -396,21 +398,54 @@ private static Map snapshotModeToParam(SnapshotMode mode) { } private LoadTableResponse loadInternal( - SessionContext context, TableIdentifier identifier, SnapshotMode mode) { + SessionContext context, + TableIdentifier identifier, + SnapshotMode mode, + Map viewContext) { Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE); AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); + return client .withAuthSession(contextualSession) .get( paths.table(identifier), - snapshotModeToParam(mode), + referencedByToQueryParam(mode.params(), viewContext), LoadTableResponse.class, Map.of(), ErrorHandlers.tableErrorHandler()); } + private Map referencedByToQueryParam( + Map params, Map context) { + if (context.isEmpty() || !context.containsKey(ContextAwareTableCatalog.VIEW_IDENTIFIER_KEY)) { + return params; + } + + Map queryParams = Maps.newHashMap(params); + Object viewIdentifierObj = context.get(ContextAwareTableCatalog.VIEW_IDENTIFIER_KEY); + + if (!(viewIdentifierObj instanceof String)) { + throw new IllegalStateException("Invalid view identifier in context: " + viewIdentifierObj); + } + + String[] parts = (viewIdentifierObj.toString()).split("\\."); + if (parts.length < 2) { + throw new IllegalStateException("Invalid view identifier in context: " + viewIdentifierObj); + } + + String[] namespaceParts = Arrays.copyOf(parts, parts.length - 1); + String viewName = parts[parts.length - 1]; + + String encodedNamespace = RESTUtil.encodeNamespace(Namespace.of(namespaceParts)); + String encodedViewName = RESTUtil.encodeString("." + viewName); + queryParams.put("referenced-by", encodedNamespace + encodedViewName); + + return queryParams; + } + @Override - public Table loadTable(SessionContext context, TableIdentifier identifier) { + public Table loadTableWithContext( + SessionContext sessionContext, TableIdentifier identifier, Map context) { Endpoint.check( endpoints, Endpoint.V1_LOAD_TABLE, @@ -418,24 +453,22 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { new NoSuchTableException( "Unable to load table %s.%s: Server does not support endpoint %s", name(), identifier, Endpoint.V1_LOAD_TABLE)); - checkIdentifierIsValid(identifier); MetadataTableType metadataType; LoadTableResponse response; - TableIdentifier loadedIdent; + TableIdentifier loadedIdent = identifier; + try { - response = loadInternal(context, identifier, snapshotMode); - loadedIdent = identifier; + response = loadInternal(sessionContext, identifier, snapshotMode, context); metadataType = null; - } catch (NoSuchTableException original) { - metadataType = MetadataTableType.from(identifier.name()); + metadataType = MetadataTableType.from(loadedIdent.name()); if (metadataType != null) { // attempt to load a metadata table using the identifier's namespace as the base table - TableIdentifier baseIdent = TableIdentifier.of(identifier.namespace().levels()); + TableIdentifier baseIdent = TableIdentifier.of(loadedIdent.namespace().levels()); try { - response = loadInternal(context, baseIdent, snapshotMode); + response = loadInternal(sessionContext, baseIdent, snapshotMode, context); loadedIdent = baseIdent; } catch (NoSuchTableException ignored) { // the base table does not exist @@ -449,7 +482,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { TableIdentifier finalIdentifier = loadedIdent; Map tableConf = response.config(); - AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); + AuthSession contextualSession = authManager.contextualSession(sessionContext, catalogAuth); AuthSession tableSession = authManager.tableSession(finalIdentifier, tableConf, contextualSession); TableMetadata tableMetadata; @@ -461,7 +494,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { .setPreviousFileLocation(null) .setSnapshotsSupplier( () -> - loadInternal(context, finalIdentifier, SnapshotMode.ALL) + loadInternal(sessionContext, finalIdentifier, SnapshotMode.ALL, context) .tableMetadata() .snapshots()) .discardChanges() @@ -477,7 +510,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { paths.table(finalIdentifier), Map::of, mutationHeaders, - tableFileIO(context, tableConf, response.credentials()), + tableFileIO(sessionContext, tableConf, response.credentials()), tableMetadata, endpoints); @@ -521,6 +554,11 @@ private RESTTable restTableForScanPlanning( return null; } + @Override + public Table loadTable(SessionContext context, TableIdentifier identifier) { + return loadTableWithContext(context, identifier, Map.of()); + } + private void trackFileIO(RESTTableOperations ops) { if (io != ops.io()) { fileIOTracker.track(ops); @@ -904,7 +942,7 @@ public Transaction replaceTransaction() { throw new AlreadyExistsException("View with same name already exists: %s", ident); } - LoadTableResponse response = loadInternal(context, ident, snapshotMode); + LoadTableResponse response = loadInternal(context, ident, snapshotMode, Map.of()); String fullName = fullTableName(ident); Map tableConf = response.config(); diff --git a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index ff7d20241bed..7aba7203ea92 100644 --- a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -18,6 +18,7 @@ */ package org.apache.spark.sql.catalyst.analysis +import org.apache.iceberg.spark.ContextAwareTableCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.ViewUtil.IcebergViewHelper @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View +import org.apache.spark.sql.catalyst.plans.logical.views.UnResolvedRelationFromView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin @@ -37,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.MetadataBuilder case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { @@ -62,6 +65,28 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) + case u @ UnResolvedRelationFromView( + tableParts @ CatalogAndIdentifier(catalog, tableIdent), viewParts, options, isStreaming) => + val context = new java.util.HashMap[String, Object]() + context.put( + org.apache.iceberg.catalog.ContextAwareTableCatalog.VIEW_IDENTIFIER_KEY, viewParts.mkString(".")) + try { + catalog match { + case contextAwareCatalog: ContextAwareTableCatalog => + val table = contextAwareCatalog.loadTable(tableIdent, context) + DataSourceV2Relation.create(table, Some(catalog), Some(tableIdent), options) + case catalog if catalog.asTableCatalog.isInstanceOf[ContextAwareTableCatalog] => + val table = + catalog.asTableCatalog.asInstanceOf[ContextAwareTableCatalog].loadTable(tableIdent, context) + DataSourceV2Relation.create(table, Some(catalog), Some(tableIdent), options) + case _ => + val table = catalog.asTableCatalog.loadTable(tableIdent) + DataSourceV2Relation.create(table, Some(catalog), Some(tableIdent), options) + } + } catch { + case _: Throwable => UnresolvedRelation(tableParts, options, isStreaming) + } + case c @ CreateIcebergView( ResolvedIdentifier(_, _), _, @@ -107,7 +132,8 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look // Apply any necessary rewrites to preserve correct resolution val viewCatalogAndNamespace: Seq[String] = view.currentCatalog +: view.currentNamespace.toSeq - val rewritten = rewriteIdentifiers(parsed, viewCatalogAndNamespace); + val qualifiedNameParts = Seq(view.currentNamespace.mkString("."), nameParts.last) + val rewritten = rewriteIdentifiers(parsed, viewCatalogAndNamespace, Some(qualifiedNameParts)) // Apply the field aliases and column comments // This logic differs from how Spark handles views in SessionCatalog.fromCatalogTable. @@ -135,14 +161,16 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } private def rewriteIdentifiers( - plan: LogicalPlan, - catalogAndNamespace: Seq[String]): LogicalPlan = { + plan: LogicalPlan, + catalogAndNamespace: Seq[String], + viewIdentifier: Option[Seq[String]] = None): LogicalPlan = { // Substitute CTEs and Unresolved Ordinals within the view, then rewrite unresolved functions and relations qualifyTableIdentifiers( qualifyFunctionIdentifiers( SubstituteUnresolvedOrdinals.apply(CTESubstitution.apply(plan)), catalogAndNamespace), - catalogAndNamespace) + catalogAndNamespace, + viewIdentifier) } private def qualifyFunctionIdentifiers( @@ -162,18 +190,30 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look * Qualify table identifiers with default catalog and namespace if necessary. */ private def qualifyTableIdentifiers( - child: LogicalPlan, - catalogAndNamespace: Seq[String]): LogicalPlan = + child: LogicalPlan, + catalogAndNamespace: Seq[String], + viewIdentifier: Option[Seq[String]]): LogicalPlan = { child transform { - case u @ UnresolvedRelation(Seq(table), _, _) => - u.copy(multipartIdentifier = catalogAndNamespace :+ table) - case u @ UnresolvedRelation(parts, _, _) if !isCatalog(parts.head) => - u.copy(multipartIdentifier = catalogAndNamespace.head +: parts) + case u @ UnresolvedRelation(parts, options, isStreaming) => + val qualifiedTableId = parts match { + case Seq(table) => catalogAndNamespace :+ table + case _ if !isCatalog(parts.head) => catalogAndNamespace.head +: parts + case _ => parts // fallback for other cases + } + + viewIdentifier match { + case Some(viewId) => + UnResolvedRelationFromView(qualifiedTableId, viewId, options, isStreaming) + case _ => + u.copy(multipartIdentifier = qualifiedTableId) + } case other => - other.transformExpressions { case subquery: SubqueryExpression => - subquery.withNewPlan(qualifyTableIdentifiers(subquery.plan, catalogAndNamespace)) + other.transformExpressions { + case subquery: SubqueryExpression => + subquery.withNewPlan(qualifyTableIdentifiers(subquery.plan, catalogAndNamespace, viewIdentifier)) } } + } private def isCatalog(name: String): Boolean = { catalogManager.isCatalogRegistered(name) diff --git a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala new file mode 100644 index 000000000000..1fa0b3a3225b --- /dev/null +++ b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.util.quoteIfNeeded +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Represents an unresolved table/relation that was referenced from within a view. + * This node carries both the table identifier and the view identifier that + * referenced it, enabling view-aware authorization and security mechanisms. + * + * This allows catalogs to implement different security models such as: + * - Definer rights: Use view creator's permissions to access the underlying table + * - Invoker rights: Use current user's permissions to access the underlying table + * + * When passed to REST catalogs, the view identifier is encoded using unit separator + * character (0x1F) for nested namespaces in the "referenced-by" query parameter. + * + * @param tableMultipartIdentifier The multipart identifier for the target table + * @param viewMultipartIdentifier The multipart identifier for the view that references this table + * @param options Options passed to the table (similar to UnresolvedRelation) + * @param isStreaming Whether this is a streaming relation + */ +case class UnResolvedRelationFromView( + tableMultipartIdentifier: Seq[String], + viewMultipartIdentifier: Seq[String], + options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(), + override val isStreaming: Boolean = false) extends LeafNode { + + override def output: Seq[Attribute] = Nil + + override lazy val resolved = false + + def tableName: String = tableMultipartIdentifier.map(quoteIfNeeded).mkString(".") + + def viewName: String = viewMultipartIdentifier.map(quoteIfNeeded).mkString(".") + + override def simpleString(maxFields: Int): String = { + s"'UnresolvedRelationFromView [table=$tableName, view=$viewName, " + + s"${if (isStreaming) "streaming=true, " else ""}options=$options]" + } + + override def toString: String = { + s"UnresolvedRelationFromView([${tableMultipartIdentifier.mkString(", ")}], " + + s"[${viewMultipartIdentifier.mkString(", ")}], $isStreaming)" + } +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ContextAwareTableCatalog.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ContextAwareTableCatalog.java new file mode 100644 index 000000000000..45da2d833f9f --- /dev/null +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ContextAwareTableCatalog.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Map; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; + +public interface ContextAwareTableCatalog { + + Table loadTable(Identifier identifier, Map loadingContext) + throws NoSuchTableException; + + Table loadTable(Identifier identifier, String version, Map loadingContext) + throws NoSuchTableException; + + Table loadTable(Identifier identifier, long timestamp, Map loadingContext) + throws NoSuchTableException; +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index da22607d05b0..94a90952b446 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -121,7 +121,7 @@ * *

*/ -public class SparkCatalog extends BaseCatalog { +public class SparkCatalog extends BaseCatalog implements ContextAwareTableCatalog { private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); private static final Splitter COMMA = Splitter.on(","); private static final Joiner COMMA_JOINER = Joiner.on(","); @@ -167,8 +167,14 @@ protected TableIdentifier buildIdentifier(Identifier identifier) { @Override public Table loadTable(Identifier ident) throws NoSuchTableException { + return loadTable(ident, Map.of()); + } + + @Override + public Table loadTable(Identifier ident, Map context) + throws NoSuchTableException { try { - return load(ident); + return load(ident, context); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -176,7 +182,13 @@ public Table loadTable(Identifier ident) throws NoSuchTableException { @Override public Table loadTable(Identifier ident, String version) throws NoSuchTableException { - Table table = loadTable(ident); + return loadTable(ident, version, Map.of()); + } + + @Override + public Table loadTable(Identifier ident, String version, Map loadingContext) + throws NoSuchTableException { + Table table = load(ident, loadingContext); if (table instanceof SparkTable) { SparkTable sparkTable = (SparkTable) table; @@ -211,7 +223,13 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep @Override public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { - Table table = loadTable(ident); + return loadTable(ident, timestamp, Map.of()); + } + + @Override + public Table loadTable(Identifier ident, long timestamp, Map loadingContext) + throws NoSuchTableException { + Table table = load(ident, loadingContext); if (table instanceof SparkTable) { SparkTable sparkTable = (SparkTable) table; @@ -858,13 +876,14 @@ private static void checkNotPathIdentifier(Identifier identifier, String method) } } - private Table load(Identifier ident) { + private Table load(Identifier ident, Map context) { if (isPathIdentifier(ident)) { return loadFromPathIdentifier((PathIdentifier) ident); } try { - org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident)); + org.apache.iceberg.Table table = load(buildIdentifier(ident), context); + return new SparkTable(table, !cacheEnabled); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { @@ -877,7 +896,7 @@ private Table load(Identifier ident) { TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); org.apache.iceberg.Table table; try { - table = icebergCatalog.loadTable(namespaceAsIdent); + table = load(namespaceAsIdent, context); } catch (Exception ignored) { // the namespace does not identify a table, so it cannot be a table with a snapshot selector // throw the original exception @@ -927,6 +946,16 @@ private Table load(Identifier ident) { } } + private org.apache.iceberg.Table load(TableIdentifier ident, Map context) { + if (icebergCatalog instanceof org.apache.iceberg.catalog.ContextAwareTableCatalog + && !context.isEmpty()) { + return ((org.apache.iceberg.catalog.ContextAwareTableCatalog) icebergCatalog) + .loadTable(ident, context); + } else { + return icebergCatalog.loadTable(ident); + } + } + private Pair> parseLocationString(String location) { int hashIndex = location.lastIndexOf('#'); if (hashIndex != -1 && !location.endsWith("#")) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java index f49660a9f27c..cb0f6a9dd3ac 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java @@ -63,7 +63,7 @@ */ public class SparkSessionCatalog< T extends TableCatalog & FunctionCatalog & SupportsNamespaces & ViewCatalog> - extends BaseCatalog implements CatalogExtension { + extends BaseCatalog implements CatalogExtension, ContextAwareTableCatalog { private static final String[] DEFAULT_NAMESPACE = new String[] {"default"}; private String catalogName = null; @@ -143,28 +143,63 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti @Override public Table loadTable(Identifier ident) throws NoSuchTableException { + return loadTable(ident, Map.of()); + } + + @Override + public Table loadTable(Identifier identifier, Map context) + throws NoSuchTableException { try { - return icebergCatalog.loadTable(ident); - } catch (NoSuchTableException e) { - return getSessionCatalog().loadTable(ident); + if (icebergCatalog instanceof ContextAwareTableCatalog && !context.isEmpty()) { + return ((ContextAwareTableCatalog) icebergCatalog) + .loadTable(identifier, context); + } else { + return icebergCatalog.loadTable(identifier); + } + } catch (org.apache.iceberg.exceptions.NoSuchTableException | NoSuchTableException e) { + return getSessionCatalog().loadTable(identifier); } } + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + return loadTable(ident, timestamp, Map.of()); + } + @Override public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + return loadTable(ident, version, Map.of()); + } + + @Override + public Table loadTable( + Identifier identifier, String version, Map loadingContext) + throws NoSuchTableException { try { - return icebergCatalog.loadTable(ident, version); - } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - return getSessionCatalog().loadTable(ident, version); + if (icebergCatalog instanceof ContextAwareTableCatalog && !loadingContext.isEmpty()) { + return ((ContextAwareTableCatalog) icebergCatalog) + .loadTable(identifier, version, loadingContext); + } else { + return icebergCatalog.loadTable(identifier, version); + } + } catch (org.apache.iceberg.exceptions.NoSuchTableException | NoSuchTableException e) { + return getSessionCatalog().loadTable(identifier, version); } } @Override - public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + public Table loadTable( + Identifier identifier, long timestamp, Map loadingContext) + throws NoSuchTableException { try { - return icebergCatalog.loadTable(ident, timestamp); - } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - return getSessionCatalog().loadTable(ident, timestamp); + if (icebergCatalog instanceof ContextAwareTableCatalog && !loadingContext.isEmpty()) { + return ((ContextAwareTableCatalog) icebergCatalog) + .loadTable(identifier, timestamp, loadingContext); + } else { + return icebergCatalog.loadTable(identifier, timestamp); + } + } catch (org.apache.iceberg.exceptions.NoSuchTableException | NoSuchTableException e) { + return getSessionCatalog().loadTable(identifier, timestamp); } } From 57e3e3069d81cb5d8760731ec09298b865b413e5 Mon Sep 17 00:00:00 2001 From: Prashant Singh Date: Mon, 3 Nov 2025 04:16:11 -0800 Subject: [PATCH 2/2] Address review feedback --- .../catalog/ContextAwareTableCatalog.java | 15 ++++- .../iceberg/rest/RESTSessionCatalog.java | 42 ++++++++---- .../sql/catalyst/analysis/ResolveViews.scala | 64 +++++++++++++++---- .../views/UnResolvedRelationFromView.scala | 22 +++---- .../iceberg/spark/SparkSessionCatalog.java | 9 +-- 5 files changed, 106 insertions(+), 46 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java b/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java index 22a194027995..898d455be5a2 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java +++ b/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java @@ -28,7 +28,18 @@ */ public interface ContextAwareTableCatalog { - /** Context key for the view identifier that references a table */ + /** + * Context key for the view identifier(s) that reference the table being loaded. + * + *

The value should be a List<TableIdentifier> representing the view(s) that reference + * the table. For nested views (a view referencing another view which references the table), the + * list should be ordered from outermost view to the view that directly references the table. + * + *

This structured format keeps namespace parts and view names separate throughout the catalog + * layer, only being serialized into the REST API format when making the actual HTTP request + * according to the spec: namespace parts joined by namespace separator (default 0x1F), followed + * by dot, followed by view name, with multiple views comma-separated. + */ String VIEW_IDENTIFIER_KEY = "view.referenced-by"; /** @@ -37,7 +48,7 @@ public interface ContextAwareTableCatalog { *

Common context keys: * *

    - *
  • {@link #VIEW_IDENTIFIER_KEY}: The fully qualified identifier of the view referencing this + *
  • {@link #VIEW_IDENTIFIER_KEY}: A List<TableIdentifier> of view(s) referencing this * table *
* diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index ee4e51a6ab1e..e78631966cff 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; @@ -409,7 +408,7 @@ private LoadTableResponse loadInternal( .withAuthSession(contextualSession) .get( paths.table(identifier), - referencedByToQueryParam(mode.params(), viewContext), + referencedByToQueryParam(snapshotModeToParam(mode), viewContext), LoadTableResponse.class, Map.of(), ErrorHandlers.tableErrorHandler()); @@ -424,21 +423,40 @@ private Map referencedByToQueryParam( Map queryParams = Maps.newHashMap(params); Object viewIdentifierObj = context.get(ContextAwareTableCatalog.VIEW_IDENTIFIER_KEY); - if (!(viewIdentifierObj instanceof String)) { - throw new IllegalStateException("Invalid view identifier in context: " + viewIdentifierObj); + if (!(viewIdentifierObj instanceof List)) { + throw new IllegalStateException( + "Invalid view identifier in context, expected List: " + + viewIdentifierObj); } - String[] parts = (viewIdentifierObj.toString()).split("\\."); - if (parts.length < 2) { - throw new IllegalStateException("Invalid view identifier in context: " + viewIdentifierObj); + @SuppressWarnings("unchecked") + List viewIdentifiers = (List) viewIdentifierObj; + + if (viewIdentifiers.isEmpty()) { + return params; } - String[] namespaceParts = Arrays.copyOf(parts, parts.length - 1); - String viewName = parts[parts.length - 1]; + // Format per REST API spec: + // - Namespace parts joined by namespace separator (0x1F, URL-encoded as %1F) + // - Namespace and view name separated by dot (.) + // - Multiple views separated by comma (,) + // - Commas in view names must be percent-encoded as %2C + List formattedViews = Lists.newArrayListWithCapacity(viewIdentifiers.size()); + for (TableIdentifier viewId : viewIdentifiers) { + // Encode namespace using RESTUtil which handles the namespace separator + String encodedNamespace = RESTUtil.encodeNamespace(viewId.namespace()); + + // Encode view name - commas must be encoded as %2C + String encodedViewName = RESTUtil.encodeString(viewId.name()); + + // Combine: encodedNamespace + "." + encodedViewName + // Note: RESTUtil.encodeString already encodes the dot if present in the view name, + // but we add an unencoded dot as the separator + formattedViews.add(encodedNamespace + "." + encodedViewName); + } - String encodedNamespace = RESTUtil.encodeNamespace(Namespace.of(namespaceParts)); - String encodedViewName = RESTUtil.encodeString("." + viewName); - queryParams.put("referenced-by", encodedNamespace + encodedViewName); + // Join multiple views with comma + queryParams.put("referenced-by", String.join(",", formattedViews)); return queryParams; } diff --git a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 7aba7203ea92..ab335e4176ed 100644 --- a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -18,6 +18,9 @@ */ package org.apache.spark.sql.catalyst.analysis +import org.apache.iceberg.catalog.{Namespace => IcebergNamespace} +import org.apache.iceberg.catalog.{TableIdentifier => IcebergTableIdentifier} +import org.apache.iceberg.catalog.{ContextAwareTableCatalog => IcebergContextAwareTableCatalog} import org.apache.iceberg.spark.ContextAwareTableCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier @@ -66,10 +69,39 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .getOrElse(u) case u @ UnResolvedRelationFromView( - tableParts @ CatalogAndIdentifier(catalog, tableIdent), viewParts, options, isStreaming) => + tableParts @ CatalogAndIdentifier(catalog, tableIdent), + viewParts, + options, + isStreaming) => val context = new java.util.HashMap[String, Object]() - context.put( - org.apache.iceberg.catalog.ContextAwareTableCatalog.VIEW_IDENTIFIER_KEY, viewParts.mkString(".")) + + // Parse viewParts into namespace and view name + // viewParts format: Seq("namespace_part1", "namespace_part2", ..., "viewName") + // Note: catalog is NOT included in viewParts, it's already resolved via the catalog parameter + if (viewParts.nonEmpty) { + if (viewParts.length == 1) { + // Single part means view name only, no namespace + val viewName = viewParts.head + val namespace = IcebergNamespace.empty() + val viewIdentifier = IcebergTableIdentifier.of(namespace, viewName) + + context.put( + IcebergContextAwareTableCatalog.VIEW_IDENTIFIER_KEY, + java.util.Collections.singletonList(viewIdentifier)) + } else { + // Multiple parts: all but last are namespace, last is view name + val namespaceParts = viewParts.dropRight(1) + val viewName = viewParts.last + + val namespace = IcebergNamespace.of(namespaceParts: _*) + val viewIdentifier = IcebergTableIdentifier.of(namespace, viewName) + + context.put( + IcebergContextAwareTableCatalog.VIEW_IDENTIFIER_KEY, + java.util.Collections.singletonList(viewIdentifier)) + } + } + try { catalog match { case contextAwareCatalog: ContextAwareTableCatalog => @@ -77,7 +109,9 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look DataSourceV2Relation.create(table, Some(catalog), Some(tableIdent), options) case catalog if catalog.asTableCatalog.isInstanceOf[ContextAwareTableCatalog] => val table = - catalog.asTableCatalog.asInstanceOf[ContextAwareTableCatalog].loadTable(tableIdent, context) + catalog.asTableCatalog + .asInstanceOf[ContextAwareTableCatalog] + .loadTable(tableIdent, context) DataSourceV2Relation.create(table, Some(catalog), Some(tableIdent), options) case _ => val table = catalog.asTableCatalog.loadTable(tableIdent) @@ -132,7 +166,9 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look // Apply any necessary rewrites to preserve correct resolution val viewCatalogAndNamespace: Seq[String] = view.currentCatalog +: view.currentNamespace.toSeq - val qualifiedNameParts = Seq(view.currentNamespace.mkString("."), nameParts.last) + // qualifiedNameParts: Seq of namespace parts and view name (no catalog) + // This will be passed as viewParts to UnResolvedRelationFromView + val qualifiedNameParts = view.currentNamespace.toSeq :+ nameParts.last val rewritten = rewriteIdentifiers(parsed, viewCatalogAndNamespace, Some(qualifiedNameParts)) // Apply the field aliases and column comments @@ -161,9 +197,9 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } private def rewriteIdentifiers( - plan: LogicalPlan, - catalogAndNamespace: Seq[String], - viewIdentifier: Option[Seq[String]] = None): LogicalPlan = { + plan: LogicalPlan, + catalogAndNamespace: Seq[String], + viewIdentifier: Option[Seq[String]] = None): LogicalPlan = { // Substitute CTEs and Unresolved Ordinals within the view, then rewrite unresolved functions and relations qualifyTableIdentifiers( qualifyFunctionIdentifiers( @@ -190,9 +226,9 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look * Qualify table identifiers with default catalog and namespace if necessary. */ private def qualifyTableIdentifiers( - child: LogicalPlan, - catalogAndNamespace: Seq[String], - viewIdentifier: Option[Seq[String]]): LogicalPlan = { + child: LogicalPlan, + catalogAndNamespace: Seq[String], + viewIdentifier: Option[Seq[String]]): LogicalPlan = { child transform { case u @ UnresolvedRelation(parts, options, isStreaming) => val qualifiedTableId = parts match { @@ -208,9 +244,9 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look u.copy(multipartIdentifier = qualifiedTableId) } case other => - other.transformExpressions { - case subquery: SubqueryExpression => - subquery.withNewPlan(qualifyTableIdentifiers(subquery.plan, catalogAndNamespace, viewIdentifier)) + other.transformExpressions { case subquery: SubqueryExpression => + subquery.withNewPlan( + qualifyTableIdentifiers(subquery.plan, catalogAndNamespace, viewIdentifier)) } } } diff --git a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala index 1fa0b3a3225b..67671f5916d5 100644 --- a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala +++ b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala @@ -1,18 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -32,19 +32,17 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * - Definer rights: Use view creator's permissions to access the underlying table * - Invoker rights: Use current user's permissions to access the underlying table * - * When passed to REST catalogs, the view identifier is encoded using unit separator - * character (0x1F) for nested namespaces in the "referenced-by" query parameter. - * * @param tableMultipartIdentifier The multipart identifier for the target table * @param viewMultipartIdentifier The multipart identifier for the view that references this table * @param options Options passed to the table (similar to UnresolvedRelation) * @param isStreaming Whether this is a streaming relation */ case class UnResolvedRelationFromView( - tableMultipartIdentifier: Seq[String], - viewMultipartIdentifier: Seq[String], - options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(), - override val isStreaming: Boolean = false) extends LeafNode { + tableMultipartIdentifier: Seq[String], + viewMultipartIdentifier: Seq[String], + options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(), + override val isStreaming: Boolean = false) + extends LeafNode { override def output: Seq[Attribute] = Nil diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java index cb0f6a9dd3ac..67b6252a53d1 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java @@ -151,8 +151,7 @@ public Table loadTable(Identifier identifier, Map context) throws NoSuchTableException { try { if (icebergCatalog instanceof ContextAwareTableCatalog && !context.isEmpty()) { - return ((ContextAwareTableCatalog) icebergCatalog) - .loadTable(identifier, context); + return ((ContextAwareTableCatalog) icebergCatalog).loadTable(identifier, context); } else { return icebergCatalog.loadTable(identifier); } @@ -172,8 +171,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep } @Override - public Table loadTable( - Identifier identifier, String version, Map loadingContext) + public Table loadTable(Identifier identifier, String version, Map loadingContext) throws NoSuchTableException { try { if (icebergCatalog instanceof ContextAwareTableCatalog && !loadingContext.isEmpty()) { @@ -188,8 +186,7 @@ public Table loadTable( } @Override - public Table loadTable( - Identifier identifier, long timestamp, Map loadingContext) + public Table loadTable(Identifier identifier, long timestamp, Map loadingContext) throws NoSuchTableException { try { if (icebergCatalog instanceof ContextAwareTableCatalog && !loadingContext.isEmpty()) {