Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/snowflake/snowpark/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6914,6 +6914,40 @@ def print_schema(self, level: Optional[int] = None) -> None:
"""
print(self._format_schema(level)) # noqa: T201: we need to print here.

def to_snowpark_connect_dataframe(
self,
spark_session: "pyspark.sql.SparkSession",
view_name: Optional[str] = None,
) -> "pyspark.sql.DataFrame":
"""Convert this Snowpark DataFrame to a PySpark DataFrame via Snowpark Connect
(lazy, no data materialization).

Creates a Snowflake temporary view from this DataFrame's logical plan, then has
PySpark read from that view through Snowpark Connect. The query plan is preserved
end-to-end — no data is materialized until an action is triggered on the returned
PySpark DataFrame.

Args:
spark_session: A PySpark SparkSession connected via Snowpark Connect.
view_name: Optional custom name for the intermediate Snowflake temporary view.
If not provided, a unique name is auto-generated.

Returns:
A PySpark DataFrame backed by the same Snowflake query plan.

Example::

>>> snowpark_df = session.create_dataframe([[1, "Alice"], [2, "Bob"]], schema=["id", "name"])
>>> spark_df = snowpark_df.to_snowpark_connect_dataframe(spark) # doctest: +SKIP
>>> spark_df.filter(spark_df.id > 1).show() # doctest: +SKIP
"""
import uuid

if view_name is None:
view_name = f"__snowpark_to_spark_{uuid.uuid4().hex[:8]}"
self.create_or_replace_temp_view(view_name)
return spark_session.sql(f"SELECT * FROM {view_name}")

where = filter

# Add the following lines so API docs have them
Expand Down
64 changes: 64 additions & 0 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4028,6 +4028,70 @@ def convert_row_to_list(

return df

def from_snowpark_connect_dataframe(
self,
spark_df: "pyspark.sql.DataFrame",
view_name: Optional[str] = None,
) -> DataFrame:
"""Create a Snowpark DataFrame from a PySpark DataFrame via Snowpark Connect
(lazy, no data materialization).

Registers the PySpark DataFrame as a Spark-local temporary view, then uses
``spark.sql()`` to create a real Snowflake TEMPORARY VIEW from it (Snowpark
Connect resolves the local view's logical plan into the DDL sent to Snowflake),
and finally reads the view back as a Snowpark DataFrame.

The query plan is preserved end-to-end — no data is materialized until an
action is triggered on the returned Snowpark DataFrame.

Args:
spark_df: A PySpark DataFrame from a Snowpark Connect session.
view_name: Optional custom name for the intermediate Snowflake temporary view.
If not provided, a unique name is auto-generated.

Returns:
A Snowpark :class:`DataFrame` backed by the same Snowflake query plan.

Example::

>>> spark_df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"]) # doctest: +SKIP
>>> snowpark_df = session.from_snowpark_connect_dataframe(spark_df) # doctest: +SKIP
>>> snowpark_df.show() # doctest: +SKIP
"""
import uuid

if view_name is None:
view_name = f"__spark_to_snowpark_{uuid.uuid4().hex[:8]}"

local_view = f"_local_{view_name}"
spark_session = spark_df.sparkSession
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snowpark-submit, session in on the server side, no available in the client, how to solve


# Step 1: Register PySpark DF as a Spark-local temp view
spark_df.createOrReplaceTempView(local_view)

# Step 2: Create a real Snowflake VIEW via spark_session.sql().
#
# We MUST use spark_session.sql() (not self.sql()) because only the Spark
# Connect server can resolve the local temp view name from Step 1.
#
# We use CREATE VIEW (permanent), NOT CREATE TEMPORARY VIEW, because
# Snowpark Connect by default handles "CREATE TEMPORARY VIEW" locally in
# Spark's in-memory catalog (CreateViewCommand → store_temporary_view_as_dataframe).
# Only "CREATE VIEW" (CreateView logical plan) actually sends DDL to Snowflake
# via Snowpark Python's create_or_replace_view().
#
# Note: This creates a permanent Snowflake view. Snowflake views are just
# stored query definitions (no data is materialized). The caller can drop
# the view after use with: session.sql("DROP VIEW IF EXISTS <name>").collect()
# TODO: can we create a temporary view in Snowflake using snowpark connect session?
# if we create a permanent view, it will be leaked, can we modify snowpark-connect view implementation to create a temporary view to support create real temporary view?
spark_session.sql(
f"CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM {local_view}"
).collect()

# Step 3: Return a Snowpark DataFrame backed by the Snowflake view.
return self.table(view_name)

@publicapi
def range(
self,
Expand Down
Loading