diff --git a/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java b/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java new file mode 100644 index 000000000000..898d455be5a2 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/catalog/ContextAwareTableCatalog.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +import java.util.Map; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.NoSuchTableException; + +/** + * Extension interface for Catalog that supports context-aware table loading. This enables passing + * additional context (such as view information) when loading tables. + */ +public interface ContextAwareTableCatalog { + + /** + * Context key for the view identifier(s) that reference the table being loaded. + * + *

The value should be a List<TableIdentifier> representing the view(s) that reference + * the table. For nested views (a view referencing another view which references the table), the + * list should be ordered from outermost view to the view that directly references the table. + * + *

This structured format keeps namespace parts and view names separate throughout the catalog + * layer, only being serialized into the REST API format when making the actual HTTP request + * according to the spec: namespace parts joined by namespace separator (default 0x1F), followed + * by dot, followed by view name, with multiple views comma-separated. + */ + String VIEW_IDENTIFIER_KEY = "view.referenced-by"; + + /** + * Load a table with additional context information. + * + *

Common context keys: + * + *

+ * + * @param identifier the table identifier to load + * @param loadingContext additional context information as key-value pairs + * @return the loaded table + * @throws NoSuchTableException if the table does not exist + */ + Table loadTable(TableIdentifier identifier, Map loadingContext) + throws NoSuchTableException; +} diff --git a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java index fe29f8918531..eca92b3d1a46 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java +++ b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java @@ -361,4 +361,9 @@ default boolean namespaceExists(SessionContext context, Namespace namespace) { return false; } } + + default Table loadTableWithContext( + SessionContext sessionContext, TableIdentifier ident, Map context) { + return loadTable(sessionContext, ident); + } } diff --git a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java index d6ee4d345cfa..9de0f18b9267 100644 --- a/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java @@ -62,7 +62,7 @@ public T withContext(SessionContext context, Function task) { return task.apply(asCatalog(context)); } - public class AsCatalog implements Catalog, SupportsNamespaces { + public class AsCatalog implements Catalog, ContextAwareTableCatalog, SupportsNamespaces { private final SessionContext context; private AsCatalog(SessionContext context) { @@ -159,5 +159,10 @@ public boolean removeProperties(Namespace namespace, Set removals) { public boolean namespaceExists(Namespace namespace) { return BaseSessionCatalog.this.namespaceExists(context, namespace); } + + @Override + public Table loadTable(TableIdentifier identifier, Map loadingContext) { + return BaseSessionCatalog.this.loadTableWithContext(context, identifier, loadingContext); + } } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index aff8832c6bf4..6c9c75e3f2d7 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -30,6 +30,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.ContextAwareTableCatalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -45,7 +46,12 @@ import org.apache.iceberg.view.ViewBuilder; public class RESTCatalog - implements Catalog, ViewCatalog, SupportsNamespaces, Configurable, Closeable { + implements Catalog, + ContextAwareTableCatalog, + ViewCatalog, + SupportsNamespaces, + Configurable, + Closeable { private final RESTSessionCatalog sessionCatalog; private final Catalog delegate; private final SupportsNamespaces nsDelegate; @@ -120,6 +126,15 @@ public Table loadTable(TableIdentifier ident) { return delegate.loadTable(ident); } + @Override + public Table loadTable(TableIdentifier identifier, Map loadingContext) { + if (delegate instanceof ContextAwareTableCatalog) { + ContextAwareTableCatalog catalog = (ContextAwareTableCatalog) delegate; + return catalog.loadTable(identifier, loadingContext); + } + return loadTable(identifier); + } + @Override public void invalidateTable(TableIdentifier ident) { delegate.invalidateTable(ident); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 61e25d3d4fc6..e78631966cff 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -46,6 +46,7 @@ import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.BaseViewSessionCatalog; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.ContextAwareTableCatalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; @@ -396,21 +397,73 @@ private static Map snapshotModeToParam(SnapshotMode mode) { } private LoadTableResponse loadInternal( - SessionContext context, TableIdentifier identifier, SnapshotMode mode) { + SessionContext context, + TableIdentifier identifier, + SnapshotMode mode, + Map viewContext) { Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE); AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); + return client .withAuthSession(contextualSession) .get( paths.table(identifier), - snapshotModeToParam(mode), + referencedByToQueryParam(snapshotModeToParam(mode), viewContext), LoadTableResponse.class, Map.of(), ErrorHandlers.tableErrorHandler()); } + private Map referencedByToQueryParam( + Map params, Map context) { + if (context.isEmpty() || !context.containsKey(ContextAwareTableCatalog.VIEW_IDENTIFIER_KEY)) { + return params; + } + + Map queryParams = Maps.newHashMap(params); + Object viewIdentifierObj = context.get(ContextAwareTableCatalog.VIEW_IDENTIFIER_KEY); + + if (!(viewIdentifierObj instanceof List)) { + throw new IllegalStateException( + "Invalid view identifier in context, expected List: " + + viewIdentifierObj); + } + + @SuppressWarnings("unchecked") + List viewIdentifiers = (List) viewIdentifierObj; + + if (viewIdentifiers.isEmpty()) { + return params; + } + + // Format per REST API spec: + // - Namespace parts joined by namespace separator (0x1F, URL-encoded as %1F) + // - Namespace and view name separated by dot (.) + // - Multiple views separated by comma (,) + // - Commas in view names must be percent-encoded as %2C + List formattedViews = Lists.newArrayListWithCapacity(viewIdentifiers.size()); + for (TableIdentifier viewId : viewIdentifiers) { + // Encode namespace using RESTUtil which handles the namespace separator + String encodedNamespace = RESTUtil.encodeNamespace(viewId.namespace()); + + // Encode view name - commas must be encoded as %2C + String encodedViewName = RESTUtil.encodeString(viewId.name()); + + // Combine: encodedNamespace + "." + encodedViewName + // Note: RESTUtil.encodeString already encodes the dot if present in the view name, + // but we add an unencoded dot as the separator + formattedViews.add(encodedNamespace + "." + encodedViewName); + } + + // Join multiple views with comma + queryParams.put("referenced-by", String.join(",", formattedViews)); + + return queryParams; + } + @Override - public Table loadTable(SessionContext context, TableIdentifier identifier) { + public Table loadTableWithContext( + SessionContext sessionContext, TableIdentifier identifier, Map context) { Endpoint.check( endpoints, Endpoint.V1_LOAD_TABLE, @@ -418,24 +471,22 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { new NoSuchTableException( "Unable to load table %s.%s: Server does not support endpoint %s", name(), identifier, Endpoint.V1_LOAD_TABLE)); - checkIdentifierIsValid(identifier); MetadataTableType metadataType; LoadTableResponse response; - TableIdentifier loadedIdent; + TableIdentifier loadedIdent = identifier; + try { - response = loadInternal(context, identifier, snapshotMode); - loadedIdent = identifier; + response = loadInternal(sessionContext, identifier, snapshotMode, context); metadataType = null; - } catch (NoSuchTableException original) { - metadataType = MetadataTableType.from(identifier.name()); + metadataType = MetadataTableType.from(loadedIdent.name()); if (metadataType != null) { // attempt to load a metadata table using the identifier's namespace as the base table - TableIdentifier baseIdent = TableIdentifier.of(identifier.namespace().levels()); + TableIdentifier baseIdent = TableIdentifier.of(loadedIdent.namespace().levels()); try { - response = loadInternal(context, baseIdent, snapshotMode); + response = loadInternal(sessionContext, baseIdent, snapshotMode, context); loadedIdent = baseIdent; } catch (NoSuchTableException ignored) { // the base table does not exist @@ -449,7 +500,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { TableIdentifier finalIdentifier = loadedIdent; Map tableConf = response.config(); - AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); + AuthSession contextualSession = authManager.contextualSession(sessionContext, catalogAuth); AuthSession tableSession = authManager.tableSession(finalIdentifier, tableConf, contextualSession); TableMetadata tableMetadata; @@ -461,7 +512,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { .setPreviousFileLocation(null) .setSnapshotsSupplier( () -> - loadInternal(context, finalIdentifier, SnapshotMode.ALL) + loadInternal(sessionContext, finalIdentifier, SnapshotMode.ALL, context) .tableMetadata() .snapshots()) .discardChanges() @@ -477,7 +528,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { paths.table(finalIdentifier), Map::of, mutationHeaders, - tableFileIO(context, tableConf, response.credentials()), + tableFileIO(sessionContext, tableConf, response.credentials()), tableMetadata, endpoints); @@ -521,6 +572,11 @@ private RESTTable restTableForScanPlanning( return null; } + @Override + public Table loadTable(SessionContext context, TableIdentifier identifier) { + return loadTableWithContext(context, identifier, Map.of()); + } + private void trackFileIO(RESTTableOperations ops) { if (io != ops.io()) { fileIOTracker.track(ops); @@ -904,7 +960,7 @@ public Transaction replaceTransaction() { throw new AlreadyExistsException("View with same name already exists: %s", ident); } - LoadTableResponse response = loadInternal(context, ident, snapshotMode); + LoadTableResponse response = loadInternal(context, ident, snapshotMode, Map.of()); String fullName = fullTableName(ident); Map tableConf = response.config(); diff --git a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index ff7d20241bed..ab335e4176ed 100644 --- a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -18,6 +18,10 @@ */ package org.apache.spark.sql.catalyst.analysis +import org.apache.iceberg.catalog.{Namespace => IcebergNamespace} +import org.apache.iceberg.catalog.{TableIdentifier => IcebergTableIdentifier} +import org.apache.iceberg.catalog.{ContextAwareTableCatalog => IcebergContextAwareTableCatalog} +import org.apache.iceberg.spark.ContextAwareTableCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.ViewUtil.IcebergViewHelper @@ -30,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View +import org.apache.spark.sql.catalyst.plans.logical.views.UnResolvedRelationFromView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin @@ -37,6 +42,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.MetadataBuilder case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { @@ -62,6 +68,59 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) + case u @ UnResolvedRelationFromView( + tableParts @ CatalogAndIdentifier(catalog, tableIdent), + viewParts, + options, + isStreaming) => + val context = new java.util.HashMap[String, Object]() + + // Parse viewParts into namespace and view name + // viewParts format: Seq("namespace_part1", "namespace_part2", ..., "viewName") + // Note: catalog is NOT included in viewParts, it's already resolved via the catalog parameter + if (viewParts.nonEmpty) { + if (viewParts.length == 1) { + // Single part means view name only, no namespace + val viewName = viewParts.head + val namespace = IcebergNamespace.empty() + val viewIdentifier = IcebergTableIdentifier.of(namespace, viewName) + + context.put( + IcebergContextAwareTableCatalog.VIEW_IDENTIFIER_KEY, + java.util.Collections.singletonList(viewIdentifier)) + } else { + // Multiple parts: all but last are namespace, last is view name + val namespaceParts = viewParts.dropRight(1) + val viewName = viewParts.last + + val namespace = IcebergNamespace.of(namespaceParts: _*) + val viewIdentifier = IcebergTableIdentifier.of(namespace, viewName) + + context.put( + IcebergContextAwareTableCatalog.VIEW_IDENTIFIER_KEY, + java.util.Collections.singletonList(viewIdentifier)) + } + } + + try { + catalog match { + case contextAwareCatalog: ContextAwareTableCatalog => + val table = contextAwareCatalog.loadTable(tableIdent, context) + DataSourceV2Relation.create(table, Some(catalog), Some(tableIdent), options) + case catalog if catalog.asTableCatalog.isInstanceOf[ContextAwareTableCatalog] => + val table = + catalog.asTableCatalog + .asInstanceOf[ContextAwareTableCatalog] + .loadTable(tableIdent, context) + DataSourceV2Relation.create(table, Some(catalog), Some(tableIdent), options) + case _ => + val table = catalog.asTableCatalog.loadTable(tableIdent) + DataSourceV2Relation.create(table, Some(catalog), Some(tableIdent), options) + } + } catch { + case _: Throwable => UnresolvedRelation(tableParts, options, isStreaming) + } + case c @ CreateIcebergView( ResolvedIdentifier(_, _), _, @@ -107,7 +166,10 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look // Apply any necessary rewrites to preserve correct resolution val viewCatalogAndNamespace: Seq[String] = view.currentCatalog +: view.currentNamespace.toSeq - val rewritten = rewriteIdentifiers(parsed, viewCatalogAndNamespace); + // qualifiedNameParts: Seq of namespace parts and view name (no catalog) + // This will be passed as viewParts to UnResolvedRelationFromView + val qualifiedNameParts = view.currentNamespace.toSeq :+ nameParts.last + val rewritten = rewriteIdentifiers(parsed, viewCatalogAndNamespace, Some(qualifiedNameParts)) // Apply the field aliases and column comments // This logic differs from how Spark handles views in SessionCatalog.fromCatalogTable. @@ -136,13 +198,15 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look private def rewriteIdentifiers( plan: LogicalPlan, - catalogAndNamespace: Seq[String]): LogicalPlan = { + catalogAndNamespace: Seq[String], + viewIdentifier: Option[Seq[String]] = None): LogicalPlan = { // Substitute CTEs and Unresolved Ordinals within the view, then rewrite unresolved functions and relations qualifyTableIdentifiers( qualifyFunctionIdentifiers( SubstituteUnresolvedOrdinals.apply(CTESubstitution.apply(plan)), catalogAndNamespace), - catalogAndNamespace) + catalogAndNamespace, + viewIdentifier) } private def qualifyFunctionIdentifiers( @@ -163,17 +227,29 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look */ private def qualifyTableIdentifiers( child: LogicalPlan, - catalogAndNamespace: Seq[String]): LogicalPlan = + catalogAndNamespace: Seq[String], + viewIdentifier: Option[Seq[String]]): LogicalPlan = { child transform { - case u @ UnresolvedRelation(Seq(table), _, _) => - u.copy(multipartIdentifier = catalogAndNamespace :+ table) - case u @ UnresolvedRelation(parts, _, _) if !isCatalog(parts.head) => - u.copy(multipartIdentifier = catalogAndNamespace.head +: parts) + case u @ UnresolvedRelation(parts, options, isStreaming) => + val qualifiedTableId = parts match { + case Seq(table) => catalogAndNamespace :+ table + case _ if !isCatalog(parts.head) => catalogAndNamespace.head +: parts + case _ => parts // fallback for other cases + } + + viewIdentifier match { + case Some(viewId) => + UnResolvedRelationFromView(qualifiedTableId, viewId, options, isStreaming) + case _ => + u.copy(multipartIdentifier = qualifiedTableId) + } case other => other.transformExpressions { case subquery: SubqueryExpression => - subquery.withNewPlan(qualifyTableIdentifiers(subquery.plan, catalogAndNamespace)) + subquery.withNewPlan( + qualifyTableIdentifiers(subquery.plan, catalogAndNamespace, viewIdentifier)) } } + } private def isCatalog(name: String): Boolean = { catalogManager.isCatalogRegistered(name) diff --git a/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala new file mode 100644 index 000000000000..67671f5916d5 --- /dev/null +++ b/spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/UnResolvedRelationFromView.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.util.quoteIfNeeded +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Represents an unresolved table/relation that was referenced from within a view. + * This node carries both the table identifier and the view identifier that + * referenced it, enabling view-aware authorization and security mechanisms. + * + * This allows catalogs to implement different security models such as: + * - Definer rights: Use view creator's permissions to access the underlying table + * - Invoker rights: Use current user's permissions to access the underlying table + * + * @param tableMultipartIdentifier The multipart identifier for the target table + * @param viewMultipartIdentifier The multipart identifier for the view that references this table + * @param options Options passed to the table (similar to UnresolvedRelation) + * @param isStreaming Whether this is a streaming relation + */ +case class UnResolvedRelationFromView( + tableMultipartIdentifier: Seq[String], + viewMultipartIdentifier: Seq[String], + options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(), + override val isStreaming: Boolean = false) + extends LeafNode { + + override def output: Seq[Attribute] = Nil + + override lazy val resolved = false + + def tableName: String = tableMultipartIdentifier.map(quoteIfNeeded).mkString(".") + + def viewName: String = viewMultipartIdentifier.map(quoteIfNeeded).mkString(".") + + override def simpleString(maxFields: Int): String = { + s"'UnresolvedRelationFromView [table=$tableName, view=$viewName, " + + s"${if (isStreaming) "streaming=true, " else ""}options=$options]" + } + + override def toString: String = { + s"UnresolvedRelationFromView([${tableMultipartIdentifier.mkString(", ")}], " + + s"[${viewMultipartIdentifier.mkString(", ")}], $isStreaming)" + } +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ContextAwareTableCatalog.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ContextAwareTableCatalog.java new file mode 100644 index 000000000000..45da2d833f9f --- /dev/null +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ContextAwareTableCatalog.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Map; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; + +public interface ContextAwareTableCatalog { + + Table loadTable(Identifier identifier, Map loadingContext) + throws NoSuchTableException; + + Table loadTable(Identifier identifier, String version, Map loadingContext) + throws NoSuchTableException; + + Table loadTable(Identifier identifier, long timestamp, Map loadingContext) + throws NoSuchTableException; +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index da22607d05b0..94a90952b446 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -121,7 +121,7 @@ * *

*/ -public class SparkCatalog extends BaseCatalog { +public class SparkCatalog extends BaseCatalog implements ContextAwareTableCatalog { private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); private static final Splitter COMMA = Splitter.on(","); private static final Joiner COMMA_JOINER = Joiner.on(","); @@ -167,8 +167,14 @@ protected TableIdentifier buildIdentifier(Identifier identifier) { @Override public Table loadTable(Identifier ident) throws NoSuchTableException { + return loadTable(ident, Map.of()); + } + + @Override + public Table loadTable(Identifier ident, Map context) + throws NoSuchTableException { try { - return load(ident); + return load(ident, context); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -176,7 +182,13 @@ public Table loadTable(Identifier ident) throws NoSuchTableException { @Override public Table loadTable(Identifier ident, String version) throws NoSuchTableException { - Table table = loadTable(ident); + return loadTable(ident, version, Map.of()); + } + + @Override + public Table loadTable(Identifier ident, String version, Map loadingContext) + throws NoSuchTableException { + Table table = load(ident, loadingContext); if (table instanceof SparkTable) { SparkTable sparkTable = (SparkTable) table; @@ -211,7 +223,13 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep @Override public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { - Table table = loadTable(ident); + return loadTable(ident, timestamp, Map.of()); + } + + @Override + public Table loadTable(Identifier ident, long timestamp, Map loadingContext) + throws NoSuchTableException { + Table table = load(ident, loadingContext); if (table instanceof SparkTable) { SparkTable sparkTable = (SparkTable) table; @@ -858,13 +876,14 @@ private static void checkNotPathIdentifier(Identifier identifier, String method) } } - private Table load(Identifier ident) { + private Table load(Identifier ident, Map context) { if (isPathIdentifier(ident)) { return loadFromPathIdentifier((PathIdentifier) ident); } try { - org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident)); + org.apache.iceberg.Table table = load(buildIdentifier(ident), context); + return new SparkTable(table, !cacheEnabled); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { @@ -877,7 +896,7 @@ private Table load(Identifier ident) { TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); org.apache.iceberg.Table table; try { - table = icebergCatalog.loadTable(namespaceAsIdent); + table = load(namespaceAsIdent, context); } catch (Exception ignored) { // the namespace does not identify a table, so it cannot be a table with a snapshot selector // throw the original exception @@ -927,6 +946,16 @@ private Table load(Identifier ident) { } } + private org.apache.iceberg.Table load(TableIdentifier ident, Map context) { + if (icebergCatalog instanceof org.apache.iceberg.catalog.ContextAwareTableCatalog + && !context.isEmpty()) { + return ((org.apache.iceberg.catalog.ContextAwareTableCatalog) icebergCatalog) + .loadTable(ident, context); + } else { + return icebergCatalog.loadTable(ident); + } + } + private Pair> parseLocationString(String location) { int hashIndex = location.lastIndexOf('#'); if (hashIndex != -1 && !location.endsWith("#")) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java index f49660a9f27c..67b6252a53d1 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java @@ -63,7 +63,7 @@ */ public class SparkSessionCatalog< T extends TableCatalog & FunctionCatalog & SupportsNamespaces & ViewCatalog> - extends BaseCatalog implements CatalogExtension { + extends BaseCatalog implements CatalogExtension, ContextAwareTableCatalog { private static final String[] DEFAULT_NAMESPACE = new String[] {"default"}; private String catalogName = null; @@ -143,28 +143,60 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti @Override public Table loadTable(Identifier ident) throws NoSuchTableException { + return loadTable(ident, Map.of()); + } + + @Override + public Table loadTable(Identifier identifier, Map context) + throws NoSuchTableException { try { - return icebergCatalog.loadTable(ident); - } catch (NoSuchTableException e) { - return getSessionCatalog().loadTable(ident); + if (icebergCatalog instanceof ContextAwareTableCatalog && !context.isEmpty()) { + return ((ContextAwareTableCatalog) icebergCatalog).loadTable(identifier, context); + } else { + return icebergCatalog.loadTable(identifier); + } + } catch (org.apache.iceberg.exceptions.NoSuchTableException | NoSuchTableException e) { + return getSessionCatalog().loadTable(identifier); } } + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + return loadTable(ident, timestamp, Map.of()); + } + @Override public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + return loadTable(ident, version, Map.of()); + } + + @Override + public Table loadTable(Identifier identifier, String version, Map loadingContext) + throws NoSuchTableException { try { - return icebergCatalog.loadTable(ident, version); - } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - return getSessionCatalog().loadTable(ident, version); + if (icebergCatalog instanceof ContextAwareTableCatalog && !loadingContext.isEmpty()) { + return ((ContextAwareTableCatalog) icebergCatalog) + .loadTable(identifier, version, loadingContext); + } else { + return icebergCatalog.loadTable(identifier, version); + } + } catch (org.apache.iceberg.exceptions.NoSuchTableException | NoSuchTableException e) { + return getSessionCatalog().loadTable(identifier, version); } } @Override - public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + public Table loadTable(Identifier identifier, long timestamp, Map loadingContext) + throws NoSuchTableException { try { - return icebergCatalog.loadTable(ident, timestamp); - } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - return getSessionCatalog().loadTable(ident, timestamp); + if (icebergCatalog instanceof ContextAwareTableCatalog && !loadingContext.isEmpty()) { + return ((ContextAwareTableCatalog) icebergCatalog) + .loadTable(identifier, timestamp, loadingContext); + } else { + return icebergCatalog.loadTable(identifier, timestamp); + } + } catch (org.apache.iceberg.exceptions.NoSuchTableException | NoSuchTableException e) { + return getSessionCatalog().loadTable(identifier, timestamp); } }