diff --git a/dags/pipeline.py b/dags/pipeline.py index 31051dcc..143a1ae5 100644 --- a/dags/pipeline.py +++ b/dags/pipeline.py @@ -19,6 +19,7 @@ def run_make_observations( MakeObservationsParams, make_observations, ) + # We need to perform this transformation in here because the {{ ts }} # parmaetrization is resolved at runtime if bucket_date == "": @@ -61,6 +62,21 @@ def run_make_analysis( make_analysis(params) +def run_make_event_detector( + clickhouse_url: str, probe_cc: List[str], timestamp: str = "", ts: str = "" +): + from oonipipeline.tasks.detector import MakeDetectorParams, make_detector + + if timestamp == "": + timestamp = ts[:13] + + params = MakeDetectorParams( + clickhouse_url=clickhouse_url, probe_cc=probe_cc, timestamp=timestamp + ) + + make_detector(params) + + REQUIREMENTS = [str((pathlib.Path(__file__).parent.parent / "oonipipeline").absolute())] with DAG( @@ -158,4 +174,20 @@ def run_make_analysis( system_site_packages=False, ) - op_make_observations_hourly >> op_make_analysis_hourly + op_make_event_detector_hourly = PythonVirtualenvOperator( + task_id="make_event_detector", + python_callable=run_make_event_detector, + op_kwargs={ + "probe_cc": dag_full.params["probe_cc"], + "clickhouse_url": Variable.get("clickhouse_url", default_var=""), + "ts": "{{ts}}", + }, + requirements=REQUIREMENTS, + system_site_packages=False, + ) + + ( + op_make_observations_hourly + >> op_make_analysis_hourly + >> op_make_event_detector_hourly + )