Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@

- Fixed a bug where `cloudpickle` was not automatically added to the package list when using `artifact_repository` with custom packages, causing `ModuleNotFoundError` at runtime.
- Fixed a bug when reading xml with custom schema, result include element attributes when column is not `StructType` type.
- `DataFrameAnalyticsFunctions.time_series_agg()`: The current row is now excluded from window aggregations to prevent data leakage.

#### Improvements

- Reduced the size of queries generated by certain `DataFrame.join` operations.
- Removed redundant aliases in generated queries (for example, `SELECT "A" AS "A"` is now always simplified to `SELECT "A"`).
- Removed experimental warning from `DataFrameAnalyticsFunctions.time_series_agg()`.


### Snowpark pandas API Updates

Expand Down Expand Up @@ -54,7 +57,15 @@
#### Improvements
- `snowflake.snowpark.context.configure_development_features` is effective for multiple sessions including newly created sessions after the configuration. No duplicate experimental warning any more.
- Removed experimental warning from `DataFrame.to_arrow` and `DataFrame.to_arrow_batches`.
- When both `Session.reduce_describe_query_enabled` and `Session.cte_optimization_enabled` are enabled, fewer DESCRIBE queries are issued when resolving table attributes.

### Snowpark pandas API Updates

#### New Features

#### Bug Fixes

#### Improvements


## 1.44.0 (2025-12-15)

Expand Down
13 changes: 9 additions & 4 deletions src/snowflake/snowpark/dataframe_analytics_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
build_expr_from_snowpark_column_or_col_name,
with_src_position,
)
from snowflake.snowpark._internal.utils import experimental, publicapi, warning
from snowflake.snowpark._internal.utils import publicapi, warning
from snowflake.snowpark.column import Column, _to_col_if_str
from snowflake.snowpark.functions import (
_call_function,
Expand Down Expand Up @@ -629,7 +629,6 @@ def compute_lead(

return df

@experimental(version="1.12.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this deserve a line in the CHANGELOG?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@publicapi
def time_series_agg(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious how changing time_series_agg fixes the issue SNOW-2680714. Are we calling this function with the sliding_window_agg or is the customer passing this in?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made recent updates, Please take a look. We will exclude the rows with same timestamp with second granularity.

self,
Expand All @@ -645,6 +644,9 @@ def time_series_agg(
Applies aggregations to the specified columns of the DataFrame over specified time windows,
and grouping criteria.

.. note::
Aggregations exclude rows within 1 second of the current timestamp to prevent data leakage.

Args:
aggs: A dictionary where keys are column names and values are lists of the desired aggregation functions.
windows: Time windows for aggregations using strings such as '7D' for 7 days, where the units are
Expand Down Expand Up @@ -776,10 +778,13 @@ def time_series_agg(
years=interval_args.get("y"),
)

one_second = make_interval(seconds=1)
if window_sign > 0:
range_start, range_end = Window.CURRENT_ROW, interval
range_start = one_second
range_end = interval
else:
range_start, range_end = -interval, Window.CURRENT_ROW
range_start = -interval
range_end = -one_second

window_spec = (
Window.partition_by(group_by)
Expand Down
87 changes: 68 additions & 19 deletions tests/integ/test_df_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,16 +444,31 @@ def custom_formatter(input_col, agg, window):
"PRODUCTKEY": [101, 101, 101, 102],
"ORDERDATE": ["2023-01-01", "2023-01-02", "2023-01-03", "2023-01-04"],
"SALESAMOUNT": [200, 100, 300, 250],
"SUM_SALESAMOUNT_1D": [300, 400, 300, 250],
"MAX_SALESAMOUNT_1D": [200, 300, 300, 250],
"SUM_SALESAMOUNT_-1D": [200, 300, 400, 250],
"MAX_SALESAMOUNT_-1D": [200, 200, 300, 250],
"SUM_SALESAMOUNT_2D": [600, 400, 300, 250],
"MAX_SALESAMOUNT_2D": [300, 300, 300, 250],
"SUM_SALESAMOUNT_-2D": [200, 300, 600, 250],
"MAX_SALESAMOUNT_-2D": [200, 200, 300, 250],
"SUM_SALESAMOUNT_-1W": [200, 300, 600, 250],
"MAX_SALESAMOUNT_-1W": [200, 200, 300, 250],
# 1D (future): next day only
"SUM_SALESAMOUNT_1D": [
100,
300,
None,
None,
], # [100(next), 300(next), NULL, NULL]
"MAX_SALESAMOUNT_1D": [100, 300, None, None],
# -1D (past): previous day only
"SUM_SALESAMOUNT_-1D": [
None,
200,
100,
None,
], # [NULL, 200(prev), 100(prev), NULL]
"MAX_SALESAMOUNT_-1D": [None, 200, 100, None],
# 2D (future): next 2 days
"SUM_SALESAMOUNT_2D": [400, 300, None, None], # [100+300, 300, NULL, NULL]
"MAX_SALESAMOUNT_2D": [300, 300, None, None],
# -2D (past): previous 2 days
"SUM_SALESAMOUNT_-2D": [None, 200, 300, None], # [NULL, 200, 200+100, NULL]
"MAX_SALESAMOUNT_-2D": [None, 200, 200, None],
# -1W (past): previous week
"SUM_SALESAMOUNT_-1W": [None, 200, 300, None], # [NULL, 200, 200+100, NULL]
"MAX_SALESAMOUNT_-1W": [None, 200, 200, None],
}
expected_df = pd.DataFrame(expected_data)

Expand Down Expand Up @@ -500,9 +515,27 @@ def test_time_series_agg_sub_day_sliding_windows(session):
]
),
"SALESAMOUNT": [400, 300, 200, 100],
"SALESAMOUNT_MAX_-1s": [400, 400, 200, 100],
"SALESAMOUNT_MAX_-1m": [400, 400, 300, 100],
"SALESAMOUNT_MAX_-1h": [400, 400, 400, 200],
# -1s: previous 1 second only (excludes current)
"SALESAMOUNT_MAX_-1s": [
None,
400,
None,
None,
], # [NULL, 400(1s before), NULL(>1s gap), NULL(>1s gap)]
# -1m: previous 1 minute (excludes current)
"SALESAMOUNT_MAX_-1m": [
None,
400,
300,
None,
], # [NULL, 400(1s<1m), 300(60s=1m), NULL(>1m gap)]
# -1h: previous 1 hour (excludes current)
"SALESAMOUNT_MAX_-1h": [
None,
400,
400,
200,
], # [NULL, 400, MAX(400,300), 200(exactly 1h)]
}
expected_df = pd.DataFrame(expected_data)

Expand Down Expand Up @@ -552,8 +585,20 @@ def test_time_series_aggregation_grouping_bug_fix(session):
"transaction_4",
],
"QUANTITY": [10, 15, 7, 3],
"QUANTITY_SUM_-1D": [10, 15, 22, 3],
"QUANTITY_SUM_-7D": [10, 15, 22, 25],
# -1D: previous 1 day (excludes current)
"QUANTITY_SUM_-1D": [
None,
None,
15,
None,
], # [NULL, NULL, 15(8h earlier), NULL(>1D gap)]
# -7D: previous 7 days (excludes current)
"QUANTITY_SUM_-7D": [
None,
None,
15,
22,
], # [NULL, NULL, 15(same day), 15+7(2D ago)]
}

expected_df = pd.DataFrame(expected_data)
Expand Down Expand Up @@ -609,8 +654,10 @@ def custom_formatter(input_col, agg, window):
"2023-04-20",
],
"SALESAMOUNT": [100, 200, 300, 400, 150, 250, 350, 450],
"SUM_SALESAMOUNT_-2mm": [100, 300, 600, 900, 150, 400, 750, 1050],
"MAX_SALESAMOUNT_-2mm": [100, 200, 300, 400, 150, 250, 350, 450],
# SUM excluding current row
"SUM_SALESAMOUNT_-2mm": [None, 100, 300, 500, None, 150, 400, 600],
# MAX excluding current row
"MAX_SALESAMOUNT_-2mm": [None, 100, 200, 300, None, 150, 250, 350],
}
expected_df = pd.DataFrame(expected_data)
expected_df["ORDERDATE"] = pd.to_datetime(expected_df["ORDERDATE"])
Expand Down Expand Up @@ -668,8 +715,10 @@ def custom_formatter(input_col, agg, window):
"2024-01-20",
],
"SALESAMOUNT": [100, 200, 300, 400, 150, 250, 350, 450],
"SUM_SALESAMOUNT_-1Y": [100, 300, 500, 700, 150, 400, 600, 800],
"MAX_SALESAMOUNT_-1Y": [100, 200, 300, 400, 150, 250, 350, 450],
# SUM excluding current row
"SUM_SALESAMOUNT_-1Y": [None, 100, 200, 300, None, 150, 250, 350],
# MAX excluding current row
"MAX_SALESAMOUNT_-1Y": [None, 100, 200, 300, None, 150, 250, 350],
}
expected_df = pd.DataFrame(expected_data)
expected_df["ORDERDATE"] = pd.to_datetime(expected_df["ORDERDATE"])
Expand Down
Loading