diff --git a/python/api/routers/hfp.py b/python/api/routers/hfp.py index 5019671..d4a76ab 100644 --- a/python/api/routers/hfp.py +++ b/python/api/routers/hfp.py @@ -29,7 +29,6 @@ from api.services.hfp import ( get_hfp_data, - get_speeding_data, upload_missing_preprocess_data_to_db, ) from api.services.tlp import get_tlp_data, get_tlp_data_as_json @@ -360,151 +359,6 @@ async def get_tlp_raw_data( ) return response - -@router.get( - "/speeding", - summary="Get speeding data by route id, given speed limit, tst range and bounding box", - description="Returns speeding data in a gzip compressed csv file.", - response_class=GzippedFileResponse, - responses={ - 200: { - "description": "Successful query. The data is returned as an attachment in the response. " - "File format comes from query parameters: " - "`speeding-export____.csv.gz`", - "content": {"application/gzip": {"schema": None, "example": None}}, - "headers": { - "Content-Disposition": { - "schema": { - "example": 'attachment; filename="speeding-export_20240915_20240923_2015_20.csv"' - } - } - }, - }, - 204: {"description": "Query returned no data with the given parameters."}, - }, -) -async def get_speeding( - route_id: int = Query( - default=None, - title="Route ID", - description="JORE ID of the route", - example=2015, - ), - min_spd: int = Query( - default=None, - title="Speed limit", - description="Speed limit in km/h", - example=23, - ), - from_tst: datetime = Query( - title="Minimum timestamp", - description=( - "The timestamp from which the data will be queried. (tst in HFP payload) " - "Timestamp will be read as UTC" - ), - example="2024-09-15T00:00:00", - ), - to_tst: datetime = Query( - default=None, - title="Maximum timestamp", - description=( - "The timestamp to which the data will be queried. (tst in HFP payload) " - "Timestamp will be read as UTC" - ), - example="2024-09-23T00:00:00", - ), - x_min: int = Query( - default=None, - title="x_min", - description="Coordinate of south-west corner of the bounding box (x_min, y_min). Coordinate should be given in ETRS-TM35FIN coordinate system.", - example=378651, - ), - y_min: int = Query( - default=None, - title="y_min", - description="Coordinate of south-west corner of the bounding box (x_min, y_min). Coordinate should be given in ETRS-TM35FIN coordinate system.", - example=6677277, - ), - x_max: int = Query( - default=None, - title="x_max", - description="Coordinate of north-east corner of the bounding box (x_max, y_max). Coordinate should be given in ETRS-TM35FIN coordinate system.", - example=378893, - ), - y_max: int = Query( - default=None, - title="y_max", - description="Coordinate of north-east corner of the bounding box (x_max, y_max). Coordinate should be given in ETRS-TM35FIN coordinate system.", - example=6677652, - ), -) -> JSONResponse: - with CustomDbLogHandler("api"): - fetch_start_time = time.time() - - input_stream = io.BytesIO() - output_stream = io.BytesIO() - - required_params = { - "route_id": route_id, - "min_spd": min_spd, - "from_tst": from_tst, - "to_tst": to_tst, - "x_min": x_min, - "y_min": y_min, - "x_max": x_max, - "y_max": y_max, - } - - missing_params = [ - param_name - for param_name, param_value in required_params.items() - if param_value is None - ] - - if missing_params: - logger.error(f"Missing required parameters: {', '.join(missing_params)}") - raise HTTPException( - status_code=HTTPStatus.UNPROCESSABLE_ENTITY, - detail=f"The following parameters are missing: {', '.join(missing_params)}", - ) - logger.debug( - f"Fetching speeding data. route_id: {route_id}, min_spd: {min_spd}, from_tst: {from_tst}, to_tst:{to_tst}" - ) - data = await get_speeding_data( - route_id, - min_spd, - from_tst, - to_tst, - x_min, - y_min, - x_max, - y_max, - input_stream, - ) - logger.debug(f"Speeding data for {route_id} received. Compressing.") - input_stream.seek(0) - with gzip.GzipFile(fileobj=output_stream, mode="wb") as compressed_data_stream: - for data in iter(lambda: input_stream.read(CHUNK_SIZE), b""): - compressed_data_stream.write(data) - - filename = create_filename( - "speeding-export_", - from_tst.strftime("%Y%m%d"), - to_tst.strftime("%Y%m%d"), - route_id, - min_spd, - ) - response = GzippedFileResponse( - filename=filename, content=output_stream.getvalue() - ) - - logger.debug( - f"Speeding data fetch and export completed in {int(time.time() - fetch_start_time)} seconds. Exported file: {filename}" - ) - - return response - - @router.get( "/delay_analytics", summary="Get delay analytics data.", diff --git a/python/api/services/hfp.py b/python/api/services/hfp.py index 0dfe8a0..fa683e8 100644 --- a/python/api/services/hfp.py +++ b/python/api/services/hfp.py @@ -68,60 +68,6 @@ async def get_hfp_data( stream.write(row) return row_count - -async def get_speeding_data( - route_id: int, - min_spd: int, - from_tst: datetime, - to_tst: datetime, - x_min: int, - y_min: int, - x_max: int, - y_max: int, - stream: BytesIO, -): - # Speed limit given in km/h. Convert to m/s - min_spd = min_spd / 3.6 - - async with pool.connection() as conn: - async with conn.cursor().copy( - """ - COPY ( - SELECT - (hp.spd * 3.6) AS spd_km, - hp.oday, - hp."start", - hp.direction_id, - hp.vehicle_number, - hp.point_timestamp - FROM - hfp.hfp_point hp - WHERE - hp.spd > %(min_spd)s - AND hp.point_timestamp > %(from_tst)s - AND hp.point_timestamp < %(to_tst)s - AND hp.route_id = '%(route_id)s' - AND hp.hfp_event = 'VP' - AND hp.geom && ST_MakeEnvelope(%(x_min)s, %(y_min)s, %(x_max)s, %(y_max)s, 3067) - ) TO STDOUT WITH CSV HEADER - """, - { - "min_spd": min_spd, - "from_tst": from_tst, - "to_tst": to_tst, - "route_id": route_id, - "x_min": x_min, - "y_min": y_min, - "x_max": x_max, - "y_max": y_max, - }, - ) as copy: - row_count = 0 - async for row in copy: - row_count += 1 - stream.write(row) - return row_count - async def upload_missing_preprocess_data_to_db( client: FlowAnalyticsContainerClient, missing_blobs: List[PreprocessBlobModel], diff --git a/python/common/recluster.py b/python/common/recluster.py index ae997ea..5533d8d 100644 --- a/python/common/recluster.py +++ b/python/common/recluster.py @@ -113,9 +113,6 @@ async def load_preprocess_files( compressed_data = r[0] decompressed_csv = decompressor.decompress(compressed_data) df = pd.read_csv(io.BytesIO(decompressed_csv), sep=";") - df["tst_median"] = pd.to_datetime(df["tst_median"], format="ISO8601").dt.tz_convert( - "UTC" - ) dfs.append(df) if not dfs: @@ -416,8 +413,18 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l pd.DataFrame: clusters with descriptive variables """ - df["tst_median"] = pd.to_datetime(df["tst_median"], format="ISO8601") - df["oday"] = pd.to_datetime(df["oday"]) + df = df.copy() + + for col in ["lat_median", "long_median", "hdg_median", "weight"]: + if col in df.columns: + df[col] = pd.to_numeric(df[col], errors="coerce") + + if "tst_median" in df.columns: + df["tst_median"] = pd.to_datetime(df["tst_median"], errors="coerce", utc=True) + df["tst_median_ns"] = df["tst_median"].astype("int64") + else: + df["tst_median_ns"] = pd.Series(index=df.index, dtype="float64") + clust_counts = df.drop_duplicates( subset=[ @@ -433,12 +440,23 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l clust_delay_feats = df.groupby(cluster_id_vars_on_2nd_level, observed=False)["weight"].quantile([0.10, 0.25, 0.5, 0.75, 0.90]).unstack() clust_delay_feats.columns = [(int(x * 100)) for x in clust_delay_feats.columns] clust_delay_feats = clust_delay_feats.add_prefix("q_").reset_index() - median_vars = df.groupby(cluster_id_vars_on_2nd_level, observed=False)[["lat_median", "long_median", "tst_median", "hdg_median"]].median().reset_index() + + median_cols = ["lat_median", "long_median", "hdg_median", "tst_median_ns"] + existing_median_cols = [c for c in median_cols if c in df.columns] + + median_vars = (df.groupby(cluster_id_vars_on_2nd_level, observed=False)[existing_median_cols].median().reset_index()) + + if "tst_median_ns" in median_vars.columns: + median_vars["tst_median"] = pd.to_datetime(median_vars["tst_median_ns"], utc=True) + median_vars["tst_median"] = median_vars["tst_median"].dt.tz_convert("Europe/Helsinki") + median_vars = median_vars.drop(columns=["tst_median_ns"]) + res = median_vars.merge(clust_counts, on=cluster_id_vars_on_2nd_level, how="outer") res = res.merge(clust_delay_feats, on=cluster_id_vars_on_2nd_level, how="outer") - res["oday_min"] = df.oday.min() - res["oday_max"] = df.oday.max() - res["tst_median"] = res["tst_median"].dt.tz_convert("Europe/Helsinki") + + res["oday_min"] = df["oday"].min() + res["oday_max"] = df["oday"].max() + return res @@ -455,7 +473,6 @@ def ui_related_var_modifications(df: pd.DataFrame, seasons_and_months: dict, DEP df["tst_median"] = pd.to_datetime(df["tst_median"], format="%Y-%m-%d %H:%M:%S", errors="coerce") df["year"] = df["tst_median"].dt.year df["season"] = df["tst_median"].dt.month.map(lambda x: get_season(x, seasons_and_months)) - for k, v in DCLASS_NAMES.items(): df["dclass"] = df["dclass"].replace(k, v) diff --git a/python/common/utils.py b/python/common/utils.py index e5fdf6f..f91377b 100644 --- a/python/common/utils.py +++ b/python/common/utils.py @@ -1,3 +1,4 @@ +import pandas as pd import logging as logger from datetime import date, datetime, timedelta, timezone from typing import Tuple @@ -53,6 +54,9 @@ def get_target_oday(offset=1): return start_date def get_season(month, seasons_and_months): + if month is None or pd.isna(month): + return None + key = [key for key, val in seasons_and_months.items() if month in val][0] return key.lower() diff --git a/python/durable/reclusterAnalysisActivity/__init__.py b/python/durable/reclusterAnalysisActivity/__init__.py index 3ae1101..c7bc7a8 100644 --- a/python/durable/reclusterAnalysisActivity/__init__.py +++ b/python/durable/reclusterAnalysisActivity/__init__.py @@ -45,7 +45,7 @@ async def main(input: dict) -> None: logger.debug("ReclusterAnalysisActivity completed successfully.") except Exception as e: - logger.debug(f"ReclusterAnalysisActivity error: {e}") + logger.exception("ReclusterAnalysisActivity error") await set_recluster_status( table, from_oday,