From 447339018257b625d17a921f0d940b212cfb6118 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Fri, 28 Nov 2025 11:56:25 +0100 Subject: [PATCH 1/3] Adding dag task for event detector --- .vscode/settings.json | 3 ++- dags/pipeline.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index d349ba89..793e56b2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -7,5 +7,6 @@ "[python]": { "editor.defaultFormatter": "ms-python.black-formatter" }, - "editor.formatOnSaveMode": "modifications" + "editor.formatOnSaveMode": "modifications", + "makefile.configureOnOpen": false } diff --git a/dags/pipeline.py b/dags/pipeline.py index 31051dcc..4e863923 100644 --- a/dags/pipeline.py +++ b/dags/pipeline.py @@ -19,6 +19,7 @@ def run_make_observations( MakeObservationsParams, make_observations, ) + # We need to perform this transformation in here because the {{ ts }} # parmaetrization is resolved at runtime if bucket_date == "": @@ -61,6 +62,21 @@ def run_make_analysis( make_analysis(params) +def run_make_event_detection( + clickhouse_url: str, probe_cc: List[str], timestamp: str = "", ts: str = "" +): + from oonipipeline.tasks.detector import MakeDetectorParams, make_detector + + if timestamp == "": + timestamp = ts[:13] + + params = MakeDetectorParams( + clickhouse_url=clickhouse_url, probe_cc=probe_cc, timestamp=timestamp + ) + + make_detector(params) + + REQUIREMENTS = [str((pathlib.Path(__file__).parent.parent / "oonipipeline").absolute())] with DAG( @@ -158,4 +174,20 @@ def run_make_analysis( system_site_packages=False, ) - op_make_observations_hourly >> op_make_analysis_hourly + op_make_event_detection_hourly = PythonVirtualenvOperator( + task_id="make_event_detection", + python_callable=run_make_event_detection, + op_kwargs={ + "probe_cc": dag_full.params["probe_cc"], + "clickhouse_url": Variable.get("clickhouse_url", default_var=""), + "ts": "{{ts}}", + }, + requirements=REQUIREMENTS, + system_site_packages=False, + ) + + ( + op_make_observations_hourly + >> op_make_analysis_hourly + >> op_make_event_detection_hourly + ) From 8775a56fd3d8b17995f10049b29813cc8045a8d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Fri, 28 Nov 2025 12:15:21 +0100 Subject: [PATCH 2/3] Rename detection to detector --- dags/pipeline.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dags/pipeline.py b/dags/pipeline.py index 4e863923..143a1ae5 100644 --- a/dags/pipeline.py +++ b/dags/pipeline.py @@ -62,7 +62,7 @@ def run_make_analysis( make_analysis(params) -def run_make_event_detection( +def run_make_event_detector( clickhouse_url: str, probe_cc: List[str], timestamp: str = "", ts: str = "" ): from oonipipeline.tasks.detector import MakeDetectorParams, make_detector @@ -174,9 +174,9 @@ def run_make_event_detection( system_site_packages=False, ) - op_make_event_detection_hourly = PythonVirtualenvOperator( - task_id="make_event_detection", - python_callable=run_make_event_detection, + op_make_event_detector_hourly = PythonVirtualenvOperator( + task_id="make_event_detector", + python_callable=run_make_event_detector, op_kwargs={ "probe_cc": dag_full.params["probe_cc"], "clickhouse_url": Variable.get("clickhouse_url", default_var=""), @@ -189,5 +189,5 @@ def run_make_event_detection( ( op_make_observations_hourly >> op_make_analysis_hourly - >> op_make_event_detection_hourly + >> op_make_event_detector_hourly ) From a942b92a4788c7bebf3753a2c51fb2212e835125 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Tue, 2 Dec 2025 13:23:37 +0100 Subject: [PATCH 3/3] Remove unnecessary configuration --- .vscode/settings.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 793e56b2..d349ba89 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -7,6 +7,5 @@ "[python]": { "editor.defaultFormatter": "ms-python.black-formatter" }, - "editor.formatOnSaveMode": "modifications", - "makefile.configureOnOpen": false + "editor.formatOnSaveMode": "modifications" }