diff --git a/CHANGELOG.md b/CHANGELOG.md index 560c878285..aa846dfd09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,11 +12,14 @@ - Fixed a bug where `cloudpickle` was not automatically added to the package list when using `artifact_repository` with custom packages, causing `ModuleNotFoundError` at runtime. - Fixed a bug when reading xml with custom schema, result include element attributes when column is not `StructType` type. +- `DataFrameAnalyticsFunctions.time_series_agg()`: The current row is now excluded from window aggregations to prevent data leakage. #### Improvements - Reduced the size of queries generated by certain `DataFrame.join` operations. - Removed redundant aliases in generated queries (for example, `SELECT "A" AS "A"` is now always simplified to `SELECT "A"`). +- Removed experimental warning from `DataFrameAnalyticsFunctions.time_series_agg()`. + ### Snowpark pandas API Updates @@ -54,7 +57,15 @@ #### Improvements - `snowflake.snowpark.context.configure_development_features` is effective for multiple sessions including newly created sessions after the configuration. No duplicate experimental warning any more. - Removed experimental warning from `DataFrame.to_arrow` and `DataFrame.to_arrow_batches`. -- When both `Session.reduce_describe_query_enabled` and `Session.cte_optimization_enabled` are enabled, fewer DESCRIBE queries are issued when resolving table attributes. + +### Snowpark pandas API Updates + +#### New Features + +#### Bug Fixes + +#### Improvements + ## 1.44.0 (2025-12-15) diff --git a/src/snowflake/snowpark/dataframe_analytics_functions.py b/src/snowflake/snowpark/dataframe_analytics_functions.py index f60159d4b8..024edab487 100644 --- a/src/snowflake/snowpark/dataframe_analytics_functions.py +++ b/src/snowflake/snowpark/dataframe_analytics_functions.py @@ -10,7 +10,7 @@ build_expr_from_snowpark_column_or_col_name, with_src_position, ) -from snowflake.snowpark._internal.utils import experimental, publicapi, warning +from snowflake.snowpark._internal.utils import publicapi, warning from snowflake.snowpark.column import Column, _to_col_if_str from snowflake.snowpark.functions import ( _call_function, @@ -629,7 +629,6 @@ def compute_lead( return df - @experimental(version="1.12.0") @publicapi def time_series_agg( self, @@ -645,6 +644,9 @@ def time_series_agg( Applies aggregations to the specified columns of the DataFrame over specified time windows, and grouping criteria. + .. note:: + Aggregations exclude rows within 1 second of the current timestamp to prevent data leakage. + Args: aggs: A dictionary where keys are column names and values are lists of the desired aggregation functions. windows: Time windows for aggregations using strings such as '7D' for 7 days, where the units are @@ -776,10 +778,13 @@ def time_series_agg( years=interval_args.get("y"), ) + one_second = make_interval(seconds=1) if window_sign > 0: - range_start, range_end = Window.CURRENT_ROW, interval + range_start = one_second + range_end = interval else: - range_start, range_end = -interval, Window.CURRENT_ROW + range_start = -interval + range_end = -one_second window_spec = ( Window.partition_by(group_by) diff --git a/tests/integ/test_df_analytics.py b/tests/integ/test_df_analytics.py index 8c678ac65b..7f4f0b5cb2 100644 --- a/tests/integ/test_df_analytics.py +++ b/tests/integ/test_df_analytics.py @@ -444,16 +444,31 @@ def custom_formatter(input_col, agg, window): "PRODUCTKEY": [101, 101, 101, 102], "ORDERDATE": ["2023-01-01", "2023-01-02", "2023-01-03", "2023-01-04"], "SALESAMOUNT": [200, 100, 300, 250], - "SUM_SALESAMOUNT_1D": [300, 400, 300, 250], - "MAX_SALESAMOUNT_1D": [200, 300, 300, 250], - "SUM_SALESAMOUNT_-1D": [200, 300, 400, 250], - "MAX_SALESAMOUNT_-1D": [200, 200, 300, 250], - "SUM_SALESAMOUNT_2D": [600, 400, 300, 250], - "MAX_SALESAMOUNT_2D": [300, 300, 300, 250], - "SUM_SALESAMOUNT_-2D": [200, 300, 600, 250], - "MAX_SALESAMOUNT_-2D": [200, 200, 300, 250], - "SUM_SALESAMOUNT_-1W": [200, 300, 600, 250], - "MAX_SALESAMOUNT_-1W": [200, 200, 300, 250], + # 1D (future): next day only + "SUM_SALESAMOUNT_1D": [ + 100, + 300, + None, + None, + ], # [100(next), 300(next), NULL, NULL] + "MAX_SALESAMOUNT_1D": [100, 300, None, None], + # -1D (past): previous day only + "SUM_SALESAMOUNT_-1D": [ + None, + 200, + 100, + None, + ], # [NULL, 200(prev), 100(prev), NULL] + "MAX_SALESAMOUNT_-1D": [None, 200, 100, None], + # 2D (future): next 2 days + "SUM_SALESAMOUNT_2D": [400, 300, None, None], # [100+300, 300, NULL, NULL] + "MAX_SALESAMOUNT_2D": [300, 300, None, None], + # -2D (past): previous 2 days + "SUM_SALESAMOUNT_-2D": [None, 200, 300, None], # [NULL, 200, 200+100, NULL] + "MAX_SALESAMOUNT_-2D": [None, 200, 200, None], + # -1W (past): previous week + "SUM_SALESAMOUNT_-1W": [None, 200, 300, None], # [NULL, 200, 200+100, NULL] + "MAX_SALESAMOUNT_-1W": [None, 200, 200, None], } expected_df = pd.DataFrame(expected_data) @@ -500,9 +515,27 @@ def test_time_series_agg_sub_day_sliding_windows(session): ] ), "SALESAMOUNT": [400, 300, 200, 100], - "SALESAMOUNT_MAX_-1s": [400, 400, 200, 100], - "SALESAMOUNT_MAX_-1m": [400, 400, 300, 100], - "SALESAMOUNT_MAX_-1h": [400, 400, 400, 200], + # -1s: previous 1 second only (excludes current) + "SALESAMOUNT_MAX_-1s": [ + None, + 400, + None, + None, + ], # [NULL, 400(1s before), NULL(>1s gap), NULL(>1s gap)] + # -1m: previous 1 minute (excludes current) + "SALESAMOUNT_MAX_-1m": [ + None, + 400, + 300, + None, + ], # [NULL, 400(1s<1m), 300(60s=1m), NULL(>1m gap)] + # -1h: previous 1 hour (excludes current) + "SALESAMOUNT_MAX_-1h": [ + None, + 400, + 400, + 200, + ], # [NULL, 400, MAX(400,300), 200(exactly 1h)] } expected_df = pd.DataFrame(expected_data) @@ -552,8 +585,20 @@ def test_time_series_aggregation_grouping_bug_fix(session): "transaction_4", ], "QUANTITY": [10, 15, 7, 3], - "QUANTITY_SUM_-1D": [10, 15, 22, 3], - "QUANTITY_SUM_-7D": [10, 15, 22, 25], + # -1D: previous 1 day (excludes current) + "QUANTITY_SUM_-1D": [ + None, + None, + 15, + None, + ], # [NULL, NULL, 15(8h earlier), NULL(>1D gap)] + # -7D: previous 7 days (excludes current) + "QUANTITY_SUM_-7D": [ + None, + None, + 15, + 22, + ], # [NULL, NULL, 15(same day), 15+7(2D ago)] } expected_df = pd.DataFrame(expected_data) @@ -609,8 +654,10 @@ def custom_formatter(input_col, agg, window): "2023-04-20", ], "SALESAMOUNT": [100, 200, 300, 400, 150, 250, 350, 450], - "SUM_SALESAMOUNT_-2mm": [100, 300, 600, 900, 150, 400, 750, 1050], - "MAX_SALESAMOUNT_-2mm": [100, 200, 300, 400, 150, 250, 350, 450], + # SUM excluding current row + "SUM_SALESAMOUNT_-2mm": [None, 100, 300, 500, None, 150, 400, 600], + # MAX excluding current row + "MAX_SALESAMOUNT_-2mm": [None, 100, 200, 300, None, 150, 250, 350], } expected_df = pd.DataFrame(expected_data) expected_df["ORDERDATE"] = pd.to_datetime(expected_df["ORDERDATE"]) @@ -668,8 +715,10 @@ def custom_formatter(input_col, agg, window): "2024-01-20", ], "SALESAMOUNT": [100, 200, 300, 400, 150, 250, 350, 450], - "SUM_SALESAMOUNT_-1Y": [100, 300, 500, 700, 150, 400, 600, 800], - "MAX_SALESAMOUNT_-1Y": [100, 200, 300, 400, 150, 250, 350, 450], + # SUM excluding current row + "SUM_SALESAMOUNT_-1Y": [None, 100, 200, 300, None, 150, 250, 350], + # MAX excluding current row + "MAX_SALESAMOUNT_-1Y": [None, 100, 200, 300, None, 150, 250, 350], } expected_df = pd.DataFrame(expected_data) expected_df["ORDERDATE"] = pd.to_datetime(expected_df["ORDERDATE"])