Skip to content
Merged

Dev #215

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 0 additions & 146 deletions python/api/routers/hfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

from api.services.hfp import (
get_hfp_data,
get_speeding_data,
upload_missing_preprocess_data_to_db,
)
from api.services.tlp import get_tlp_data, get_tlp_data_as_json
Expand Down Expand Up @@ -360,151 +359,6 @@ async def get_tlp_raw_data(
)
return response


@router.get(
"/speeding",
summary="Get speeding data by route id, given speed limit, tst range and bounding box",
description="Returns speeding data in a gzip compressed csv file.",
response_class=GzippedFileResponse,
responses={
200: {
"description": "Successful query. The data is returned as an attachment in the response. "
"File format comes from query parameters: "
"`speeding-export_<from_tst>_<to_tst>_<route_id>_<min_spd>.csv.gz`",
"content": {"application/gzip": {"schema": None, "example": None}},
"headers": {
"Content-Disposition": {
"schema": {
"example": 'attachment; filename="speeding-export_20240915_20240923_2015_20.csv"'
}
}
},
},
204: {"description": "Query returned no data with the given parameters."},
},
)
async def get_speeding(
route_id: int = Query(
default=None,
title="Route ID",
description="JORE ID of the route",
example=2015,
),
min_spd: int = Query(
default=None,
title="Speed limit",
description="Speed limit in km/h",
example=23,
),
from_tst: datetime = Query(
title="Minimum timestamp",
description=(
"The timestamp from which the data will be queried. (tst in HFP payload) "
"Timestamp will be read as UTC"
),
example="2024-09-15T00:00:00",
),
to_tst: datetime = Query(
default=None,
title="Maximum timestamp",
description=(
"The timestamp to which the data will be queried. (tst in HFP payload) "
"Timestamp will be read as UTC"
),
example="2024-09-23T00:00:00",
),
x_min: int = Query(
default=None,
title="x_min",
description="Coordinate of south-west corner of the bounding box (x_min, y_min). Coordinate should be given in ETRS-TM35FIN coordinate system.",
example=378651,
),
y_min: int = Query(
default=None,
title="y_min",
description="Coordinate of south-west corner of the bounding box (x_min, y_min). Coordinate should be given in ETRS-TM35FIN coordinate system.",
example=6677277,
),
x_max: int = Query(
default=None,
title="x_max",
description="Coordinate of north-east corner of the bounding box (x_max, y_max). Coordinate should be given in ETRS-TM35FIN coordinate system.",
example=378893,
),
y_max: int = Query(
default=None,
title="y_max",
description="Coordinate of north-east corner of the bounding box (x_max, y_max). Coordinate should be given in ETRS-TM35FIN coordinate system.",
example=6677652,
),
) -> JSONResponse:
with CustomDbLogHandler("api"):
fetch_start_time = time.time()

input_stream = io.BytesIO()
output_stream = io.BytesIO()

required_params = {
"route_id": route_id,
"min_spd": min_spd,
"from_tst": from_tst,
"to_tst": to_tst,
"x_min": x_min,
"y_min": y_min,
"x_max": x_max,
"y_max": y_max,
}

missing_params = [
param_name
for param_name, param_value in required_params.items()
if param_value is None
]

if missing_params:
logger.error(f"Missing required parameters: {', '.join(missing_params)}")
raise HTTPException(
status_code=HTTPStatus.UNPROCESSABLE_ENTITY,
detail=f"The following parameters are missing: {', '.join(missing_params)}",
)
logger.debug(
f"Fetching speeding data. route_id: {route_id}, min_spd: {min_spd}, from_tst: {from_tst}, to_tst:{to_tst}"
)
data = await get_speeding_data(
route_id,
min_spd,
from_tst,
to_tst,
x_min,
y_min,
x_max,
y_max,
input_stream,
)
logger.debug(f"Speeding data for {route_id} received. Compressing.")
input_stream.seek(0)
with gzip.GzipFile(fileobj=output_stream, mode="wb") as compressed_data_stream:
for data in iter(lambda: input_stream.read(CHUNK_SIZE), b""):
compressed_data_stream.write(data)

filename = create_filename(
"speeding-export_",
from_tst.strftime("%Y%m%d"),
to_tst.strftime("%Y%m%d"),
route_id,
min_spd,
)
response = GzippedFileResponse(
filename=filename, content=output_stream.getvalue()
)

logger.debug(
f"Speeding data fetch and export completed in {int(time.time() - fetch_start_time)} seconds. Exported file: {filename}"
)

return response


@router.get(
"/delay_analytics",
summary="Get delay analytics data.",
Expand Down
54 changes: 0 additions & 54 deletions python/api/services/hfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,60 +68,6 @@ async def get_hfp_data(
stream.write(row)
return row_count


async def get_speeding_data(
route_id: int,
min_spd: int,
from_tst: datetime,
to_tst: datetime,
x_min: int,
y_min: int,
x_max: int,
y_max: int,
stream: BytesIO,
):
# Speed limit given in km/h. Convert to m/s
min_spd = min_spd / 3.6

async with pool.connection() as conn:
async with conn.cursor().copy(
"""
COPY (
SELECT
(hp.spd * 3.6) AS spd_km,
hp.oday,
hp."start",
hp.direction_id,
hp.vehicle_number,
hp.point_timestamp
FROM
hfp.hfp_point hp
WHERE
hp.spd > %(min_spd)s
AND hp.point_timestamp > %(from_tst)s
AND hp.point_timestamp < %(to_tst)s
AND hp.route_id = '%(route_id)s'
AND hp.hfp_event = 'VP'
AND hp.geom && ST_MakeEnvelope(%(x_min)s, %(y_min)s, %(x_max)s, %(y_max)s, 3067)
) TO STDOUT WITH CSV HEADER
""",
{
"min_spd": min_spd,
"from_tst": from_tst,
"to_tst": to_tst,
"route_id": route_id,
"x_min": x_min,
"y_min": y_min,
"x_max": x_max,
"y_max": y_max,
},
) as copy:
row_count = 0
async for row in copy:
row_count += 1
stream.write(row)
return row_count

async def upload_missing_preprocess_data_to_db(
client: FlowAnalyticsContainerClient,
missing_blobs: List[PreprocessBlobModel],
Expand Down
37 changes: 27 additions & 10 deletions python/common/recluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ async def load_preprocess_files(
compressed_data = r[0]
decompressed_csv = decompressor.decompress(compressed_data)
df = pd.read_csv(io.BytesIO(decompressed_csv), sep=";")
df["tst_median"] = pd.to_datetime(df["tst_median"], format="ISO8601").dt.tz_convert(
"UTC"
)
dfs.append(df)

if not dfs:
Expand Down Expand Up @@ -416,8 +413,18 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l
pd.DataFrame: clusters with descriptive variables
"""

df["tst_median"] = pd.to_datetime(df["tst_median"], format="ISO8601")
df["oday"] = pd.to_datetime(df["oday"])
df = df.copy()

for col in ["lat_median", "long_median", "hdg_median", "weight"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")

if "tst_median" in df.columns:
df["tst_median"] = pd.to_datetime(df["tst_median"], errors="coerce", utc=True)
df["tst_median_ns"] = df["tst_median"].astype("int64")
else:
df["tst_median_ns"] = pd.Series(index=df.index, dtype="float64")


clust_counts = df.drop_duplicates(
subset=[
Expand All @@ -433,12 +440,23 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l
clust_delay_feats = df.groupby(cluster_id_vars_on_2nd_level, observed=False)["weight"].quantile([0.10, 0.25, 0.5, 0.75, 0.90]).unstack()
clust_delay_feats.columns = [(int(x * 100)) for x in clust_delay_feats.columns]
clust_delay_feats = clust_delay_feats.add_prefix("q_").reset_index()
median_vars = df.groupby(cluster_id_vars_on_2nd_level, observed=False)[["lat_median", "long_median", "tst_median", "hdg_median"]].median().reset_index()

median_cols = ["lat_median", "long_median", "hdg_median", "tst_median_ns"]
existing_median_cols = [c for c in median_cols if c in df.columns]

median_vars = (df.groupby(cluster_id_vars_on_2nd_level, observed=False)[existing_median_cols].median().reset_index())

if "tst_median_ns" in median_vars.columns:
median_vars["tst_median"] = pd.to_datetime(median_vars["tst_median_ns"], utc=True)
median_vars["tst_median"] = median_vars["tst_median"].dt.tz_convert("Europe/Helsinki")
median_vars = median_vars.drop(columns=["tst_median_ns"])

res = median_vars.merge(clust_counts, on=cluster_id_vars_on_2nd_level, how="outer")
res = res.merge(clust_delay_feats, on=cluster_id_vars_on_2nd_level, how="outer")
res["oday_min"] = df.oday.min()
res["oday_max"] = df.oday.max()
res["tst_median"] = res["tst_median"].dt.tz_convert("Europe/Helsinki")

res["oday_min"] = df["oday"].min()
res["oday_max"] = df["oday"].max()

return res


Expand All @@ -455,7 +473,6 @@ def ui_related_var_modifications(df: pd.DataFrame, seasons_and_months: dict, DEP
df["tst_median"] = pd.to_datetime(df["tst_median"], format="%Y-%m-%d %H:%M:%S", errors="coerce")
df["year"] = df["tst_median"].dt.year
df["season"] = df["tst_median"].dt.month.map(lambda x: get_season(x, seasons_and_months))

for k, v in DCLASS_NAMES.items():
df["dclass"] = df["dclass"].replace(k, v)

Expand Down
4 changes: 4 additions & 0 deletions python/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pandas as pd
import logging as logger
from datetime import date, datetime, timedelta, timezone
from typing import Tuple
Expand Down Expand Up @@ -53,6 +54,9 @@ def get_target_oday(offset=1):
return start_date

def get_season(month, seasons_and_months):
if month is None or pd.isna(month):
return None

key = [key for key, val in seasons_and_months.items() if month in val][0]
return key.lower()

Expand Down
2 changes: 1 addition & 1 deletion python/durable/reclusterAnalysisActivity/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def main(input: dict) -> None:
logger.debug("ReclusterAnalysisActivity completed successfully.")

except Exception as e:
logger.debug(f"ReclusterAnalysisActivity error: {e}")
logger.exception("ReclusterAnalysisActivity error")
await set_recluster_status(
table,
from_oday,
Expand Down