From e88f10e0c84c1f33ae3af17d8760d7424e8c2882 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Jyrki=C3=A4inen?= Date: Wed, 3 Dec 2025 18:24:09 +0200 Subject: [PATCH 1/8] log full stack trace for recluster errors --- python/durable/reclusterAnalysisActivity/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/durable/reclusterAnalysisActivity/__init__.py b/python/durable/reclusterAnalysisActivity/__init__.py index 3ae1101..c7bc7a8 100644 --- a/python/durable/reclusterAnalysisActivity/__init__.py +++ b/python/durable/reclusterAnalysisActivity/__init__.py @@ -45,7 +45,7 @@ async def main(input: dict) -> None: logger.debug("ReclusterAnalysisActivity completed successfully.") except Exception as e: - logger.debug(f"ReclusterAnalysisActivity error: {e}") + logger.exception("ReclusterAnalysisActivity error") await set_recluster_status( table, from_oday, From 88371cf93e70abbc2515fc3107c78653ab476512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Jyrki=C3=A4inen?= Date: Wed, 3 Dec 2025 18:43:40 +0200 Subject: [PATCH 2/8] reverting changes that caused KeyError: 'tst_median' --- python/common/recluster.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/common/recluster.py b/python/common/recluster.py index ae997ea..d283fad 100644 --- a/python/common/recluster.py +++ b/python/common/recluster.py @@ -113,9 +113,6 @@ async def load_preprocess_files( compressed_data = r[0] decompressed_csv = decompressor.decompress(compressed_data) df = pd.read_csv(io.BytesIO(decompressed_csv), sep=";") - df["tst_median"] = pd.to_datetime(df["tst_median"], format="ISO8601").dt.tz_convert( - "UTC" - ) dfs.append(df) if not dfs: @@ -438,7 +435,6 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l res = res.merge(clust_delay_feats, on=cluster_id_vars_on_2nd_level, how="outer") res["oday_min"] = df.oday.min() res["oday_max"] = df.oday.max() - res["tst_median"] = res["tst_median"].dt.tz_convert("Europe/Helsinki") return res From 71c89c533b0b2b321060381e27f076a1a8db76a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Jyrki=C3=A4inen?= Date: Fri, 5 Dec 2025 18:56:37 +0200 Subject: [PATCH 3/8] fix calculate_cluster_features() to handle different timezones --- python/common/recluster.py | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/python/common/recluster.py b/python/common/recluster.py index d283fad..09e7a07 100644 --- a/python/common/recluster.py +++ b/python/common/recluster.py @@ -413,8 +413,20 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l pd.DataFrame: clusters with descriptive variables """ - df["tst_median"] = pd.to_datetime(df["tst_median"], format="ISO8601") - df["oday"] = pd.to_datetime(df["oday"]) + df = df.copy() + + for col in ["lat_median", "long_median", "hdg_median", "weight"]: + if col in df.columns: + df[col] = pd.to_numeric(df[col], errors="coerce") + + if "tst_median" in df.columns: + df["tst_median"] = pd.to_datetime(df["tst_median"], errors="coerce", utc=True) + df["tst_median_ns"] = df["tst_median"].view("int64") + else: + df["tst_median_ns"] = pd.Series(index=df.index, dtype="float64") + + if "oday" in df.columns: + df["oday"] = pd.to_datetime(df["oday"], errors="coerce") clust_counts = df.drop_duplicates( subset=[ @@ -430,11 +442,26 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l clust_delay_feats = df.groupby(cluster_id_vars_on_2nd_level, observed=False)["weight"].quantile([0.10, 0.25, 0.5, 0.75, 0.90]).unstack() clust_delay_feats.columns = [(int(x * 100)) for x in clust_delay_feats.columns] clust_delay_feats = clust_delay_feats.add_prefix("q_").reset_index() - median_vars = df.groupby(cluster_id_vars_on_2nd_level, observed=False)[["lat_median", "long_median", "tst_median", "hdg_median"]].median().reset_index() + + median_cols = ["lat_median", "long_median", "hdg_median", "tst_median_ns"] + existing_median_cols = [c for c in median_cols if c in df.columns] + + median_vars = (df.groupby(cluster_id_vars_on_2nd_level, observed=False)[existing_median_cols].median().reset_index()) + + if "tst_median_ns" in median_vars.columns: + median_vars["tst_median"] = pd.to_datetime(median_vars["tst_median_ns"], utc=True) + median_vars = median_vars.drop(columns=["tst_median_ns"]) + res = median_vars.merge(clust_counts, on=cluster_id_vars_on_2nd_level, how="outer") res = res.merge(clust_delay_feats, on=cluster_id_vars_on_2nd_level, how="outer") - res["oday_min"] = df.oday.min() - res["oday_max"] = df.oday.max() + + if "oday" in df.columns: + res["oday_min"] = df["oday"].min() + res["oday_max"] = df["oday"].max() + else: + res["oday_min"] = pd.NaT + res["oday_max"] = pd.NaT + return res From 339c8a1ae472fad405da3696b161c375a8d6aa74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Jyrki=C3=A4inen?= Date: Mon, 8 Dec 2025 13:40:07 +0200 Subject: [PATCH 4/8] debug get_season --- python/common/recluster.py | 3 ++- python/common/utils.py | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/python/common/recluster.py b/python/common/recluster.py index 09e7a07..be29eaa 100644 --- a/python/common/recluster.py +++ b/python/common/recluster.py @@ -476,9 +476,10 @@ def ui_related_var_modifications(df: pd.DataFrame, seasons_and_months: dict, DEP pd.DataFrame: clusters with ui related variables """ df["tst_median"] = pd.to_datetime(df["tst_median"], format="%Y-%m-%d %H:%M:%S", errors="coerce") + print(df["tst_median"]) df["year"] = df["tst_median"].dt.year df["season"] = df["tst_median"].dt.month.map(lambda x: get_season(x, seasons_and_months)) - + print(df["season"]) for k, v in DCLASS_NAMES.items(): df["dclass"] = df["dclass"].replace(k, v) diff --git a/python/common/utils.py b/python/common/utils.py index e5fdf6f..71c2daf 100644 --- a/python/common/utils.py +++ b/python/common/utils.py @@ -53,6 +53,11 @@ def get_target_oday(offset=1): return start_date def get_season(month, seasons_and_months): + print(month) + print(seasons_and_months) + if month is None: + return None + key = [key for key, val in seasons_and_months.items() if month in val][0] return key.lower() From e667da2f3be6a3b2234852772a1e008d61964c9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Jyrki=C3=A4inen?= Date: Mon, 8 Dec 2025 14:13:13 +0200 Subject: [PATCH 5/8] fix logging --- python/common/recluster.py | 4 ++-- python/common/utils.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/common/recluster.py b/python/common/recluster.py index be29eaa..f3d468b 100644 --- a/python/common/recluster.py +++ b/python/common/recluster.py @@ -476,10 +476,10 @@ def ui_related_var_modifications(df: pd.DataFrame, seasons_and_months: dict, DEP pd.DataFrame: clusters with ui related variables """ df["tst_median"] = pd.to_datetime(df["tst_median"], format="%Y-%m-%d %H:%M:%S", errors="coerce") - print(df["tst_median"]) + logger.debug(df["tst_median"]) df["year"] = df["tst_median"].dt.year df["season"] = df["tst_median"].dt.month.map(lambda x: get_season(x, seasons_and_months)) - print(df["season"]) + logger.debug(df["season"]) for k, v in DCLASS_NAMES.items(): df["dclass"] = df["dclass"].replace(k, v) diff --git a/python/common/utils.py b/python/common/utils.py index 71c2daf..37f765a 100644 --- a/python/common/utils.py +++ b/python/common/utils.py @@ -53,8 +53,8 @@ def get_target_oday(offset=1): return start_date def get_season(month, seasons_and_months): - print(month) - print(seasons_and_months) + logger.debug(month) + logger.debug(seasons_and_months) if month is None: return None From e79536ffae68cc84c2fab1f1af443b3a6f3ed47c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Jyrki=C3=A4inen?= Date: Mon, 8 Dec 2025 14:32:24 +0200 Subject: [PATCH 6/8] remove some logging, improve null check --- python/common/recluster.py | 2 -- python/common/utils.py | 3 ++- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/python/common/recluster.py b/python/common/recluster.py index f3d468b..99c4551 100644 --- a/python/common/recluster.py +++ b/python/common/recluster.py @@ -476,10 +476,8 @@ def ui_related_var_modifications(df: pd.DataFrame, seasons_and_months: dict, DEP pd.DataFrame: clusters with ui related variables """ df["tst_median"] = pd.to_datetime(df["tst_median"], format="%Y-%m-%d %H:%M:%S", errors="coerce") - logger.debug(df["tst_median"]) df["year"] = df["tst_median"].dt.year df["season"] = df["tst_median"].dt.month.map(lambda x: get_season(x, seasons_and_months)) - logger.debug(df["season"]) for k, v in DCLASS_NAMES.items(): df["dclass"] = df["dclass"].replace(k, v) diff --git a/python/common/utils.py b/python/common/utils.py index 37f765a..40ec4cf 100644 --- a/python/common/utils.py +++ b/python/common/utils.py @@ -1,3 +1,4 @@ +import pandas as pd import logging as logger from datetime import date, datetime, timedelta, timezone from typing import Tuple @@ -55,7 +56,7 @@ def get_target_oday(offset=1): def get_season(month, seasons_and_months): logger.debug(month) logger.debug(seasons_and_months) - if month is None: + if month is None or pd.isna(month): return None key = [key for key, val in seasons_and_months.items() if month in val][0] From acd3fe81d79f744e5c86b697e765d453aa716c89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anton=20Jyrki=C3=A4inen?= Date: Mon, 8 Dec 2025 16:01:40 +0200 Subject: [PATCH 7/8] tz converts, clean up --- python/common/recluster.py | 13 ++++--------- python/common/utils.py | 2 -- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/python/common/recluster.py b/python/common/recluster.py index 99c4551..5533d8d 100644 --- a/python/common/recluster.py +++ b/python/common/recluster.py @@ -421,12 +421,10 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l if "tst_median" in df.columns: df["tst_median"] = pd.to_datetime(df["tst_median"], errors="coerce", utc=True) - df["tst_median_ns"] = df["tst_median"].view("int64") + df["tst_median_ns"] = df["tst_median"].astype("int64") else: df["tst_median_ns"] = pd.Series(index=df.index, dtype="float64") - if "oday" in df.columns: - df["oday"] = pd.to_datetime(df["oday"], errors="coerce") clust_counts = df.drop_duplicates( subset=[ @@ -450,17 +448,14 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l if "tst_median_ns" in median_vars.columns: median_vars["tst_median"] = pd.to_datetime(median_vars["tst_median_ns"], utc=True) + median_vars["tst_median"] = median_vars["tst_median"].dt.tz_convert("Europe/Helsinki") median_vars = median_vars.drop(columns=["tst_median_ns"]) res = median_vars.merge(clust_counts, on=cluster_id_vars_on_2nd_level, how="outer") res = res.merge(clust_delay_feats, on=cluster_id_vars_on_2nd_level, how="outer") - if "oday" in df.columns: - res["oday_min"] = df["oday"].min() - res["oday_max"] = df["oday"].max() - else: - res["oday_min"] = pd.NaT - res["oday_max"] = pd.NaT + res["oday_min"] = df["oday"].min() + res["oday_max"] = df["oday"].max() return res diff --git a/python/common/utils.py b/python/common/utils.py index 40ec4cf..f91377b 100644 --- a/python/common/utils.py +++ b/python/common/utils.py @@ -54,8 +54,6 @@ def get_target_oday(offset=1): return start_date def get_season(month, seasons_and_months): - logger.debug(month) - logger.debug(seasons_and_months) if month is None or pd.isna(month): return None From 4a152317332757fc3bcffdafa759c4c4df88c248 Mon Sep 17 00:00:00 2001 From: Michal Kurant Date: Wed, 7 Jan 2026 10:55:48 +0100 Subject: [PATCH 8/8] hide get_speeding_hfp endpoint --- python/api/routers/hfp.py | 146 ------------------------------------- python/api/services/hfp.py | 54 -------------- 2 files changed, 200 deletions(-) diff --git a/python/api/routers/hfp.py b/python/api/routers/hfp.py index 5019671..d4a76ab 100644 --- a/python/api/routers/hfp.py +++ b/python/api/routers/hfp.py @@ -29,7 +29,6 @@ from api.services.hfp import ( get_hfp_data, - get_speeding_data, upload_missing_preprocess_data_to_db, ) from api.services.tlp import get_tlp_data, get_tlp_data_as_json @@ -360,151 +359,6 @@ async def get_tlp_raw_data( ) return response - -@router.get( - "/speeding", - summary="Get speeding data by route id, given speed limit, tst range and bounding box", - description="Returns speeding data in a gzip compressed csv file.", - response_class=GzippedFileResponse, - responses={ - 200: { - "description": "Successful query. The data is returned as an attachment in the response. " - "File format comes from query parameters: " - "`speeding-export____.csv.gz`", - "content": {"application/gzip": {"schema": None, "example": None}}, - "headers": { - "Content-Disposition": { - "schema": { - "example": 'attachment; filename="speeding-export_20240915_20240923_2015_20.csv"' - } - } - }, - }, - 204: {"description": "Query returned no data with the given parameters."}, - }, -) -async def get_speeding( - route_id: int = Query( - default=None, - title="Route ID", - description="JORE ID of the route", - example=2015, - ), - min_spd: int = Query( - default=None, - title="Speed limit", - description="Speed limit in km/h", - example=23, - ), - from_tst: datetime = Query( - title="Minimum timestamp", - description=( - "The timestamp from which the data will be queried. (tst in HFP payload) " - "Timestamp will be read as UTC" - ), - example="2024-09-15T00:00:00", - ), - to_tst: datetime = Query( - default=None, - title="Maximum timestamp", - description=( - "The timestamp to which the data will be queried. (tst in HFP payload) " - "Timestamp will be read as UTC" - ), - example="2024-09-23T00:00:00", - ), - x_min: int = Query( - default=None, - title="x_min", - description="Coordinate of south-west corner of the bounding box (x_min, y_min). Coordinate should be given in ETRS-TM35FIN coordinate system.", - example=378651, - ), - y_min: int = Query( - default=None, - title="y_min", - description="Coordinate of south-west corner of the bounding box (x_min, y_min). Coordinate should be given in ETRS-TM35FIN coordinate system.", - example=6677277, - ), - x_max: int = Query( - default=None, - title="x_max", - description="Coordinate of north-east corner of the bounding box (x_max, y_max). Coordinate should be given in ETRS-TM35FIN coordinate system.", - example=378893, - ), - y_max: int = Query( - default=None, - title="y_max", - description="Coordinate of north-east corner of the bounding box (x_max, y_max). Coordinate should be given in ETRS-TM35FIN coordinate system.", - example=6677652, - ), -) -> JSONResponse: - with CustomDbLogHandler("api"): - fetch_start_time = time.time() - - input_stream = io.BytesIO() - output_stream = io.BytesIO() - - required_params = { - "route_id": route_id, - "min_spd": min_spd, - "from_tst": from_tst, - "to_tst": to_tst, - "x_min": x_min, - "y_min": y_min, - "x_max": x_max, - "y_max": y_max, - } - - missing_params = [ - param_name - for param_name, param_value in required_params.items() - if param_value is None - ] - - if missing_params: - logger.error(f"Missing required parameters: {', '.join(missing_params)}") - raise HTTPException( - status_code=HTTPStatus.UNPROCESSABLE_ENTITY, - detail=f"The following parameters are missing: {', '.join(missing_params)}", - ) - logger.debug( - f"Fetching speeding data. route_id: {route_id}, min_spd: {min_spd}, from_tst: {from_tst}, to_tst:{to_tst}" - ) - data = await get_speeding_data( - route_id, - min_spd, - from_tst, - to_tst, - x_min, - y_min, - x_max, - y_max, - input_stream, - ) - logger.debug(f"Speeding data for {route_id} received. Compressing.") - input_stream.seek(0) - with gzip.GzipFile(fileobj=output_stream, mode="wb") as compressed_data_stream: - for data in iter(lambda: input_stream.read(CHUNK_SIZE), b""): - compressed_data_stream.write(data) - - filename = create_filename( - "speeding-export_", - from_tst.strftime("%Y%m%d"), - to_tst.strftime("%Y%m%d"), - route_id, - min_spd, - ) - response = GzippedFileResponse( - filename=filename, content=output_stream.getvalue() - ) - - logger.debug( - f"Speeding data fetch and export completed in {int(time.time() - fetch_start_time)} seconds. Exported file: {filename}" - ) - - return response - - @router.get( "/delay_analytics", summary="Get delay analytics data.", diff --git a/python/api/services/hfp.py b/python/api/services/hfp.py index 0dfe8a0..fa683e8 100644 --- a/python/api/services/hfp.py +++ b/python/api/services/hfp.py @@ -68,60 +68,6 @@ async def get_hfp_data( stream.write(row) return row_count - -async def get_speeding_data( - route_id: int, - min_spd: int, - from_tst: datetime, - to_tst: datetime, - x_min: int, - y_min: int, - x_max: int, - y_max: int, - stream: BytesIO, -): - # Speed limit given in km/h. Convert to m/s - min_spd = min_spd / 3.6 - - async with pool.connection() as conn: - async with conn.cursor().copy( - """ - COPY ( - SELECT - (hp.spd * 3.6) AS spd_km, - hp.oday, - hp."start", - hp.direction_id, - hp.vehicle_number, - hp.point_timestamp - FROM - hfp.hfp_point hp - WHERE - hp.spd > %(min_spd)s - AND hp.point_timestamp > %(from_tst)s - AND hp.point_timestamp < %(to_tst)s - AND hp.route_id = '%(route_id)s' - AND hp.hfp_event = 'VP' - AND hp.geom && ST_MakeEnvelope(%(x_min)s, %(y_min)s, %(x_max)s, %(y_max)s, 3067) - ) TO STDOUT WITH CSV HEADER - """, - { - "min_spd": min_spd, - "from_tst": from_tst, - "to_tst": to_tst, - "route_id": route_id, - "x_min": x_min, - "y_min": y_min, - "x_max": x_max, - "y_max": y_max, - }, - ) as copy: - row_count = 0 - async for row in copy: - row_count += 1 - stream.write(row) - return row_count - async def upload_missing_preprocess_data_to_db( client: FlowAnalyticsContainerClient, missing_blobs: List[PreprocessBlobModel],