From 9edf29ebff582e52585024d4c9dc31d0d26e2f26 Mon Sep 17 00:00:00 2001 From: ain ghazal Date: Thu, 18 Apr 2024 13:48:24 +0200 Subject: [PATCH 1/5] add openvpn data formats & observations --- oonidata/src/oonidata/models/dataformats.py | 40 ++++++++ .../src/oonidata/models/nettests/__init__.py | 3 + .../src/oonidata/models/nettests/openvpn.py | 36 +++++++ oonidata/src/oonidata/models/observations.py | 85 ++++++++++++++++ oonipipeline/Design.md | 13 +-- oonipipeline/Readme.md | 12 ++- oonipipeline/src/oonipipeline/cli/commands.py | 2 +- .../src/oonipipeline/db/create_tables.py | 2 + .../transforms/measurement_transformer.py | 99 ++++++++++++++++++- .../transforms/nettests/openvpn.py | 20 ++++ .../oonipipeline/transforms/observations.py | 5 + 11 files changed, 306 insertions(+), 11 deletions(-) create mode 100644 oonidata/src/oonidata/models/nettests/openvpn.py create mode 100644 oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py diff --git a/oonidata/src/oonidata/models/dataformats.py b/oonidata/src/oonidata/models/dataformats.py index a478180e..9de2a6db 100644 --- a/oonidata/src/oonidata/models/dataformats.py +++ b/oonidata/src/oonidata/models/dataformats.py @@ -284,6 +284,7 @@ class DNSQuery(BaseModel): dial_id: Optional[int] = None + @add_slots @dataclass class TCPConnectStatus(BaseModel): @@ -365,3 +366,42 @@ class NetworkEvent(BaseModel): # Deprecated fields dial_id: Optional[int] = None conn_id: Optional[int] = None + + +@add_slots +@dataclass +class OpenVPNHandshake(BaseModel): + handshake_time: float + endpoint: str + ip: str # we might want to make this optional, and scrub in favor of ASN/prefix + port: int + transport: str + provider: str + t0: float + t: float + openvpn_options: Optional[Dict[str, str]] = None + tags: Optional[List[str]] = None + transaction_id: Optional[str] = None + failure: Failure = None + +@add_slots +@dataclass +class OpenVPNPacket(BaseModel): + operation: str + opcode: str + id: int + payload_size: int + acks: Optional[List[int]] = None + send_attempts: Optional[int] = None + + +@add_slots +@dataclass +class OpenVPNNetworkEvent(BaseModel): + operation: str + stage: str + t: float + tags: Optional[List[str]] = None + packet: Optional[OpenVPNPacket] = None + transaction_id: Optional[int] = None + diff --git a/oonidata/src/oonidata/models/nettests/__init__.py b/oonidata/src/oonidata/models/nettests/__init__.py index 3a9a015e..db308494 100644 --- a/oonidata/src/oonidata/models/nettests/__init__.py +++ b/oonidata/src/oonidata/models/nettests/__init__.py @@ -13,6 +13,7 @@ from .whatsapp import Whatsapp from .http_invalid_request_line import HTTPInvalidRequestLine from .http_header_field_manipulation import HTTPHeaderFieldManipulation +from .openvpn import OpenVPN SUPPORTED_CLASSES = [ HTTPHeaderFieldManipulation, @@ -27,6 +28,7 @@ Signal, FacebookMessenger, Whatsapp, + OpenVPN, BaseMeasurement, ] SupportedDataformats = Union[ @@ -42,6 +44,7 @@ Signal, FacebookMessenger, Whatsapp, + OpenVPN, BaseMeasurement, ] diff --git a/oonidata/src/oonidata/models/nettests/openvpn.py b/oonidata/src/oonidata/models/nettests/openvpn.py new file mode 100644 index 00000000..a67254c9 --- /dev/null +++ b/oonidata/src/oonidata/models/nettests/openvpn.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass +from typing import List, Optional + +from ..base import BaseModel + +from oonidata.compat import add_slots +from oonidata.models.dataformats import ( + BaseTestKeys, + Failure, + TCPConnect, + OpenVPNHandshake, + OpenVPNNetworkEvent, +) +from oonidata.models.nettests.base_measurement import BaseMeasurement + + +@add_slots +@dataclass +class OpenVPNTestKeys(BaseTestKeys): + success: Optional[bool] = False + failure: Failure = None + + network_events: Optional[List[OpenVPNNetworkEvent]] = None + tcp_connect: Optional[List[TCPConnect]] = None + openvpn_handshake: Optional[List[OpenVPNHandshake]] = None + + bootstrap_time: Optional[float] = None + tunnel: str = None + + +@add_slots +@dataclass +class OpenVPN(BaseMeasurement): + __test_name__ = "openvpn" + + test_keys: OpenVPNTestKeys diff --git a/oonidata/src/oonidata/models/observations.py b/oonidata/src/oonidata/models/observations.py index cce96c82..f9c43d1d 100644 --- a/oonidata/src/oonidata/models/observations.py +++ b/oonidata/src/oonidata/models/observations.py @@ -383,3 +383,88 @@ class HTTPMiddleboxObservation: hfm_diff: Optional[str] = None hfm_failure: Optional[str] = None hfm_success: Optional[bool] = None + + +@table_model( + table_name="obs_openvpn", + table_index=( + "measurement_start_time", + "measurement_uid", + "observation_idx", + ), +) +@dataclass +class OpenVPNObservation: + measurement_meta: MeasurementMeta + + probe_meta: ProbeMeta + + observation_idx: int = 0 + + created_at: Optional[datetime] = None + + timestamp: datetime = None + + # Fields added by the processor + + ip: str = "" + port: int = 0 + transport: str = "" + + success: bool = False + failure: Failure = None + + protocol: str = "" + variant: Optional[str] = None + + # TCP related observation + tcp_failure: Optional[Failure] = None + tcp_success: Optional[bool] = None + tcp_t: Optional[float] = None + + # OpenVPN handshake observation + openvpn_handshake_failure: Optional[Failure] = None + openvpn_handshake_t: Optional[float] = None + openvpn_handshake_t0: Optional[float] = None + openvpn_bootstrap_time: Optional[float] = None + + # timing info about the handshake packets + openvpn_handshake_hr_client_t: Optional[float] = None + openvpn_handshake_hr_server_t: Optional[float] = None + openvpn_handshake_clt_hello_t: Optional[float] = None + openvpn_handshake_srv_hello_t: Optional[float] = None + openvpn_handshake_key_exchg_n: Optional[int] = None + openvpn_handshake_got_keys__t: Optional[float] = None + openvpn_handshake_gen_keys__t: Optional[float] = None + + + + +@table_model( + table_name="obs_tunnel", + table_index= ("measurement_uid", "observation_idx", "measurement_start_time"), +) +@dataclass +class TunnelEndpointObservation: + measurement_meta: MeasurementMeta + probe_meta: ProbeMeta + + measurement_start_time: datetime + + ip: str + port: int + transport: str + + # definition of success will need to change when/if we're able to gather metrics + # through the tunnel. + success: bool + failure: Failure + + protocol: str + family: str + + # indicates obfuscation or modifications from the main protocol family. + variant: Optional[str] = None + + # any metadata about the providers behind the endpoints. + provider: Optional[str] = None diff --git a/oonipipeline/Design.md b/oonipipeline/Design.md index 82323d34..7411d508 100644 --- a/oonipipeline/Design.md +++ b/oonipipeline/Design.md @@ -12,11 +12,11 @@ needed. ### Expose a queriable low level view on measurements -Currently it's only possible to query measurement at a granuliaty which is as -fine a measurement. +Currently it's only possible to query measurement at a granularity which is as +fine as a measurement. This means that it's only possible to answer questions which the original -designer of the experiment had already throught of. +designer of the experiment had already thought of. On the other hand the new pipeline breaks down measurements into distinct observations (think 1 DNS query and answer or 1 TLS handshake towards a @@ -145,16 +145,17 @@ port combination. You can run the observation generation with a clickhouse backend like so: +TODO(art): check this is correct. + ``` -poetry run python -m oonidata mkobs --clickhouse clickhouse://localhost/ --data-dir tests/data/datadir/ --start-day 2022-08-01 --end-day 2022-10-01 --create-tables --parallelism 20 +hatch run oonipipeline --probe-cc US --test-name signal --workflow-name observations --start-at 2022-08-01 --end-at 2022-10-01 ``` Here is the list of supported observations so far: - [x] WebObservation, which has information about DNS, TCP, TLS and HTTP(s) - [x] WebControlObservation, has the control measurements run by web connectivity (is used to generate ground truths) -- [ ] CircumventionToolObservation, still needs to be designed and implemented - (ideally we would use the same for OpenVPN, Psiphon, VanillaTor) +- [x] OpenVPNObservation, with measurements run by the openvpn experiment. ### Response body archiving diff --git a/oonipipeline/Readme.md b/oonipipeline/Readme.md index b1a20b01..9bc42df3 100644 --- a/oonipipeline/Readme.md +++ b/oonipipeline/Readme.md @@ -8,7 +8,7 @@ For historical context, these are the major revisions: - `v1` - OONI Pipeline based on custom CLI scripts using mongodb as a backend. Used until ~2015. - `v2` - OONI Pipeline based on [luigi](https://luigi.readthedocs.io/en/stable/). Used until ~2017. - `v3` - OONI Pipeline based on [airflow](https://airflow.apache.org/). Used until ~2020. -- `v4` - OONI Pipeline basedon custom script and systemd units (aka fastpath). Currently in use in production. +- `v4` - OONI Pipeline based on custom script and systemd units (aka fastpath). Currently in use in production. - `v5` - Next generation OONI Pipeline. What this readme is relevant to. Expected to become in production by Q4 2024. ## Setup @@ -41,13 +41,19 @@ clickhouse server Workflows are started by first scheduling them and then triggering a backfill operation on them. When they are scheduled they will also run on a daily basis. + ``` -hatch run oonipipeline schedule --probe-cc US --test-name signal --create-tables +hatch run oonipipeline schedule --probe-cc US --test-name signal ``` You can then trigger the backfill operation like so: ``` -hatch run oonipipeline backfill --probe-cc US --test-name signal --workflow-name observations --start-at 2024-01-01 --end-at 2024-02-01 +hatch run oonipipeline backfill --create-tables --probe-cc US --test-name signal --workflow-name observations --start-at 2024-01-01 --end-at 2024-02-01 +``` + +If you need to re-create the database tables (because the schema has changed), you want to add the `--drop-tables` flag to the invocation: +``` +hatch run oonipipeline backfill --create-tables --drop-tables --probe-cc US --test-name signal --workflow-name observations --start-at 2024-01-01 --end-at 2024-02-01 ``` You will then need some workers to actually perform the task you backfilled, these can be started like so: diff --git a/oonipipeline/src/oonipipeline/cli/commands.py b/oonipipeline/src/oonipipeline/cli/commands.py index 1e063762..060e8d25 100644 --- a/oonipipeline/src/oonipipeline/cli/commands.py +++ b/oonipipeline/src/oonipipeline/cli/commands.py @@ -189,7 +189,7 @@ async def main(): @click.option( "--analysis/--no-analysis", default=True, - help="should we drop tables before creating them", + help="schedule analysis too", ) def schedule(probe_cc: List[str], test_name: List[str], analysis: bool): """ diff --git a/oonipipeline/src/oonipipeline/db/create_tables.py b/oonipipeline/src/oonipipeline/db/create_tables.py index ee8efb20..6c4a7476 100644 --- a/oonipipeline/src/oonipipeline/db/create_tables.py +++ b/oonipipeline/src/oonipipeline/db/create_tables.py @@ -28,6 +28,7 @@ WebControlObservation, WebObservation, HTTPMiddleboxObservation, + OpenVPNObservation, ) from .connections import ClickhouseConnection @@ -170,6 +171,7 @@ def format_create_query( table_models = [ WebObservation, WebControlObservation, + OpenVPNObservation, HTTPMiddleboxObservation, WebAnalysis, MeasurementExperimentResult, diff --git a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py index 30e668fd..e0307e1a 100644 --- a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py +++ b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py @@ -24,6 +24,8 @@ NetworkEvent, TCPConnect, TLSHandshake, + OpenVPNHandshake, + OpenVPNNetworkEvent, maybe_binary_data_to_bytes, ) from oonidata.models.nettests.base_measurement import BaseMeasurement @@ -35,6 +37,7 @@ TCPObservation, TLSObservation, WebObservation, + OpenVPNObservation, ) from oonidata.datautils import ( InvalidCertificateChain, @@ -726,6 +729,66 @@ def make_measurement_meta(msmt: BaseMeasurement, bucket_date: str) -> Measuremen measurement_start_time=measurement_start_time, ) +def count_key_exchange_packets(network_events: List[OpenVPNNetworkEvent]) -> int: + """ + return number of packets exchanged in the SENT_KEY state + """ + n = 0 + for evt in network_events: + if evt.stage == "SENT_KEY" and evt.operation.startswith("packet_"): + n+=1 + return n + +def measurement_to_openvpn_observation( + msmt_meta: MeasurementMeta, + probe_meta: ProbeMeta, + netinfodb: NetinfoDB, + openvpn_h: OpenVPNHandshake, + tcp_connect: Optional[List[TCPConnect]], + network_events: Optional[List[OpenVPNNetworkEvent]], + bootstrap_time: float, +) -> OpenVPNObservation: + + oo = OpenVPNObservation( + measurement_meta=msmt_meta, + probe_meta=probe_meta, + failure=normalize_failure(openvpn_h.failure), + timestamp=make_timestamp(msmt_meta.measurement_start_time, openvpn_h.t), + success=openvpn_h.failure == None, + protocol="openvpn", + openvpn_bootstrap_time=bootstrap_time, + ) + + if len(tcp_connect) != 0: + tcp = tcp_connect[0] + oo.tcp_success = tcp.success + oo.tcp_failure = tcp.failure + oo.tcp_t = tcp.t + + oo.handshake_failure = openvpn_h.failure + oo.handshake_t = openvpn_h.t + oo.handshake_t0 = openvpn_h.t0 + + if len(network_events) != 0: + for evt in network_events: + if evt.packet is not None: + if evt.packet.opcode == "P_CONTROL_HARD_RESET_CLIENT_V2": + oo.openvpn_handshake_hr_client_t = evt.t + elif evt.packet.opcode == "P_CONTROL_HARD_RESET_SERVER_V2": + oo.openvpn_handshake_hr_server_t = evt.t + elif "client_hello" in evt.tags: + oo.openvpn_handshake_clt_hello_t = evt.t + elif "server_hello" in evt.tags: + oo.openvpn_handshake_clt_hello_t = evt.t + elif evt.operation == "state" and evt.stage == "GOT_KEY": + oo.openvpn_handshake_got_keys__t = evt.t + elif evt.operation == "state" and evt.stage == "GENERATED_KEYS": + oo.openvpn_handshake_gen_keys__t = evt.t + + oo.openvpn_handshake_key_exchg_n = count_key_exchange_packets(network_events) + + return oo + class MeasurementTransformer: """ @@ -878,7 +941,7 @@ def consume_web_observations( It will attempt to map them via the transaction_id or ip:port tuple. - Any observation that cannot be mapped will be returned inside of it's + Any observation that cannot be mapped will be returned inside of its own WebObservation with all other columns set to None. """ web_obs_list: List[WebObservation] = [] @@ -977,5 +1040,39 @@ def consume_web_observations( return web_obs_list + def make_openvpn_observations(self, + tcp_observations: Optional[List[TCPConnect]], + openvpn_handshakes: Optional[List[OpenVPNHandshake]], + network_events: Optional[List[OpenVPNNetworkEvent]], + bootstrap_time: float, + ) -> List[OpenVPNObservation]: + """ + Returns a list of OpenVPNObservations by mapping all related + TCPObservations, OpenVPNNetworkevents and OpenVPNHandshakes. + """ + openvpn_obs_list: List[OpenVPNObservation] = [] + + for openvpn_handshake in openvpn_handshakes: + openvpn_obs_list.append( + measurement_to_openvpn_observation( + msmt_meta=self.measurement_meta, + probe_meta=self.probe_meta, + netinfodb=self.netinfodb, + tcp_connect=tcp_observations, + openvpn_h=openvpn_handshake, + network_events=network_events, + bootstrap_time=bootstrap_time, + ) + ) + + # TODO: can factor out function with web_observation + for idx, obs in enumerate(openvpn_obs_list): + obs.observation_id = f"{obs.measurement_meta.measurement_uid}_{idx}" + obs.created_at = datetime.now(timezone.utc).replace( + microsecond=0, tzinfo=None + ) + + return openvpn_obs_list + def make_observations(self, measurement): assert RuntimeError("make_observations is not implemented") diff --git a/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py b/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py new file mode 100644 index 00000000..9904c295 --- /dev/null +++ b/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py @@ -0,0 +1,20 @@ +from typing import List, Tuple +from oonidata.models.nettests import OpenVPN +from oonidata.models.observations import OpenVPNObservation + +from ..measurement_transformer import MeasurementTransformer + + +class OpenVPNTransformer(MeasurementTransformer): + def make_observations(self, msmt: OpenVPN) -> Tuple[List[OpenVPNObservation]]: + if not msmt.test_keys: + return ([],) + + openvpn_obs = self.make_openvpn_observations( + tcp_observations=self.make_tcp_observations(msmt.test_keys.tcp_connect), + openvpn_handshakes=msmt.test_keys.openvpn_handshake, + network_events=msmt.test_keys.network_events, + bootstrap_time=msmt.test_keys.bootstrap_time, + ) + + return (openvpn_obs, ) diff --git a/oonipipeline/src/oonipipeline/transforms/observations.py b/oonipipeline/src/oonipipeline/transforms/observations.py index 5002c830..62ab0fbd 100644 --- a/oonipipeline/src/oonipipeline/transforms/observations.py +++ b/oonipipeline/src/oonipipeline/transforms/observations.py @@ -5,6 +5,7 @@ HTTPMiddleboxObservation, WebControlObservation, WebObservation, + OpenVPNObservation ) from oonidata.models.nettests import ( @@ -34,6 +35,7 @@ from .nettests.browser_web import BrowserWebTransformer from .nettests.urlgetter import UrlGetterTransformer from .nettests.web_connectivity import WebConnectivityTransformer +from .nettests.openvpn import OpenVPNTransformer from .nettests.http_invalid_request_line import ( HTTPInvalidRequestLineTransformer, ) @@ -53,12 +55,14 @@ "http_header_field_manipulation": HTTPHeaderFieldManipulationTransformer, "http_invalid_request_line": HTTPInvalidRequestLineTransformer, "web_connectivity": WebConnectivityTransformer, + "openvpn": OpenVPNTransformer, } TypeWebConnectivityObservations = Tuple[ List[WebObservation], List[WebControlObservation] ] TypeWebObservations = Tuple[List[WebObservation]] +TypeOpenVPNObservations = Tuple[List[OpenVPNObservation]] TypeHTTPMiddleboxObservations = Tuple[List[HTTPMiddleboxObservation]] @@ -107,6 +111,7 @@ def measurement_to_observations( TypeWebObservations, TypeWebConnectivityObservations, TypeHTTPMiddleboxObservations, + TypeOpenVPNObservations, Tuple[()], ]: if msmt.test_name in NETTEST_TRANSFORMERS: From 9d8994cd79aecf669cfcfa47a8cb55dbf9e9b073 Mon Sep 17 00:00:00 2001 From: ain ghazal Date: Sun, 29 Sep 2024 14:49:33 +0200 Subject: [PATCH 2/5] add tests for openvpn observations --- .../transforms/measurement_transformer.py | 14 ++++-- oonipipeline/tests/_fixtures.py | 2 + oonipipeline/tests/test_transforms.py | 49 +++++++++++++++++++ 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py index e0307e1a..25a1525d 100644 --- a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py +++ b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py @@ -756,6 +756,9 @@ def measurement_to_openvpn_observation( timestamp=make_timestamp(msmt_meta.measurement_start_time, openvpn_h.t), success=openvpn_h.failure == None, protocol="openvpn", + transport = openvpn_h.transport, + ip = openvpn_h.ip, + port = openvpn_h.port, openvpn_bootstrap_time=bootstrap_time, ) @@ -769,6 +772,7 @@ def measurement_to_openvpn_observation( oo.handshake_t = openvpn_h.t oo.handshake_t0 = openvpn_h.t0 + # TODO(ain): condition to test version >= xyz if len(network_events) != 0: for evt in network_events: if evt.packet is not None: @@ -779,11 +783,11 @@ def measurement_to_openvpn_observation( elif "client_hello" in evt.tags: oo.openvpn_handshake_clt_hello_t = evt.t elif "server_hello" in evt.tags: - oo.openvpn_handshake_clt_hello_t = evt.t - elif evt.operation == "state" and evt.stage == "GOT_KEY": - oo.openvpn_handshake_got_keys__t = evt.t - elif evt.operation == "state" and evt.stage == "GENERATED_KEYS": - oo.openvpn_handshake_gen_keys__t = evt.t + oo.openvpn_handshake_srv_hello_t = evt.t + if evt.operation == "state" and evt.stage == "GOT_KEY": + oo.openvpn_handshake_got_keys__t = evt.t + if evt.operation == "state" and evt.stage == "GENERATED_KEYS": + oo.openvpn_handshake_gen_keys__t = evt.t oo.openvpn_handshake_key_exchg_n = count_key_exchange_packets(network_events) diff --git a/oonipipeline/tests/_fixtures.py b/oonipipeline/tests/_fixtures.py index 358dbe95..398d47db 100644 --- a/oonipipeline/tests/_fixtures.py +++ b/oonipipeline/tests/_fixtures.py @@ -38,6 +38,8 @@ "20240302000050.000654_SN_webconnectivity_fe4221088fbdcb0a", # nxdomain down "20240302000305.316064_EG_webconnectivity_397bca9091b07444", # nxdomain blocked, unknown_failure and from the future "20240309112858.009725_SE_webconnectivity_dce757ef4ec9b6c8", # blockpage for Iran in Sweden + "20240923234302.648951_FI_openvpn_714dd28ff412c1a5", # openvpn from Finland, tcp + "20240923234302.024724_FI_openvpn_515e6b6d9c0d832e", # openvpn from Finland, udp ] SAMPLE_POSTCANS = ["2024030100_AM_webconnectivity.n1.0.tar.gz"] diff --git a/oonipipeline/tests/test_transforms.py b/oonipipeline/tests/test_transforms.py index 5261360b..a07b3d9c 100644 --- a/oonipipeline/tests/test_transforms.py +++ b/oonipipeline/tests/test_transforms.py @@ -11,6 +11,7 @@ from oonidata.models.nettests.stun_reachability import StunReachability from oonidata.models.nettests.urlgetter import UrlGetter from oonidata.models.nettests.browser_web import BrowserWeb +from oonidata.models.nettests.openvpn import OpenVPN from oonidata.models.observations import WebObservation from oonipipeline.transforms.measurement_transformer import ( @@ -391,3 +392,51 @@ def test_facebook_messenger_obs(netinfodb, measurements): hostname_set.add(wo.hostname) assert hostname_set == spec_hostname_set assert len(web_obs) == 14 + +def test_openvpn_obs(netinfodb, measurements): + bucket_date = "2024-09-23" + + msmt_udp = load_measurement( + msmt_path=measurements[ + "20240923234302.024724_FI_openvpn_515e6b6d9c0d832e" + ] + ) + assert isinstance(msmt_udp, OpenVPN) + obs_tup_udp = measurement_to_observations( + msmt=msmt_udp, netinfodb=netinfodb, bucket_date=bucket_date + ) + assert len(obs_tup_udp) == 1 + oou: OpenVPNObservation = obs_tup_udp[0][0] + + assert oou.success is True + assert oou.transport == "udp" + assert oou.port == 1194 + assert oou.ip == "37.218.243.98" + assert oou.tcp_success is None + assert oou.openvpn_handshake_srv_hello_t == 0.175448177 + assert oou.openvpn_handshake_got_keys__t == 0.305975312 + assert oou.openvpn_handshake_gen_keys__t == 0.376011823 + assert oou.openvpn_bootstrap_time==0.376279583 + + msmt_tcp = load_measurement( + msmt_path=measurements[ + "20240923234302.648951_FI_openvpn_714dd28ff412c1a5" + ] + ) + assert isinstance(msmt_tcp, OpenVPN) + obs_tup_tcp = measurement_to_observations( + msmt=msmt_tcp, netinfodb=netinfodb, bucket_date=bucket_date + ) + assert len(obs_tup_tcp) == 1 + oot: OpenVPNObservation = obs_tup_tcp[0][0] + + assert oot.success is True + assert oot.transport == "tcp" + assert oot.port == 1194 + assert oot.ip == "37.218.243.98" + assert oot.tcp_success is True + assert oot.tcp_t == 0.053010729 + assert oot.openvpn_handshake_hr_client_t==0.05684776 + assert oot.openvpn_handshake_srv_hello_t==0.204483958 + assert oot.openvpn_handshake_gen_keys__t==0.571443906 + assert oot.openvpn_bootstrap_time==0.571501093 From a23dda2ab29a708b99424e5077462ca8071923ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 3 Feb 2025 18:40:30 +0100 Subject: [PATCH 3/5] Refactor the openvpn tunnel observation model --- oonidata/src/oonidata/models/observations.py | 25 ++-- .../transforms/measurement_transformer.py | 101 +-------------- .../transforms/nettests/openvpn.py | 115 ++++++++++++++++-- 3 files changed, 121 insertions(+), 120 deletions(-) diff --git a/oonidata/src/oonidata/models/observations.py b/oonidata/src/oonidata/models/observations.py index bef538e2..836c2940 100644 --- a/oonidata/src/oonidata/models/observations.py +++ b/oonidata/src/oonidata/models/observations.py @@ -2,6 +2,7 @@ from dataclasses import dataclass, field from datetime import datetime from typing import ( + Dict, Optional, List, Tuple, @@ -443,33 +444,41 @@ class OpenVPNObservation: openvpn_handshake_gen_keys__t: Optional[float] = None - - @table_model( table_name="obs_tunnel", - table_index= ("measurement_uid", "observation_idx", "measurement_start_time"), + table_index=("measurement_uid", "observation_idx", "measurement_start_time"), ) @dataclass -class TunnelEndpointObservation: +class TunnelObservation: measurement_meta: MeasurementMeta probe_meta: ProbeMeta - measurement_start_time: datetime + observation_idx: int ip: str port: int transport: str + # label can be a fqdn or a human readable lable used to group the endpoint + label: str + # definition of success will need to change when/if we're able to gather metrics # through the tunnel. success: bool failure: Failure protocol: str - family: str # indicates obfuscation or modifications from the main protocol family. - variant: Optional[str] = None + variant: str = "" # any metadata about the providers behind the endpoints. - provider: Optional[str] = None + provider: str = "" + + # time it took to perform the bootstrap of the protocol + bootstrap_time: float = -1 + + # timing list + t0: float = -1 + timing_map: Dict[str, float] = field(default_factory=dict) + failure_map: Dict[str, str] = field(default_factory=dict) diff --git a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py index 64531487..e7bfdcc7 100644 --- a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py +++ b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py @@ -37,7 +37,7 @@ TCPObservation, TLSObservation, WebObservation, - OpenVPNObservation, + TunnelObservation, ) from oonidata.datautils import ( InvalidCertificateChain, @@ -734,71 +734,6 @@ def make_measurement_meta(msmt: BaseMeasurement, bucket_date: str) -> Measuremen measurement_start_time=measurement_start_time, ) -def count_key_exchange_packets(network_events: List[OpenVPNNetworkEvent]) -> int: - """ - return number of packets exchanged in the SENT_KEY state - """ - n = 0 - for evt in network_events: - if evt.stage == "SENT_KEY" and evt.operation.startswith("packet_"): - n+=1 - return n - -def measurement_to_openvpn_observation( - msmt_meta: MeasurementMeta, - probe_meta: ProbeMeta, - netinfodb: NetinfoDB, - openvpn_h: OpenVPNHandshake, - tcp_connect: Optional[List[TCPConnect]], - network_events: Optional[List[OpenVPNNetworkEvent]], - bootstrap_time: float, -) -> OpenVPNObservation: - - oo = OpenVPNObservation( - measurement_meta=msmt_meta, - probe_meta=probe_meta, - failure=normalize_failure(openvpn_h.failure), - timestamp=make_timestamp(msmt_meta.measurement_start_time, openvpn_h.t), - success=openvpn_h.failure == None, - protocol="openvpn", - transport = openvpn_h.transport, - ip = openvpn_h.ip, - port = openvpn_h.port, - openvpn_bootstrap_time=bootstrap_time, - ) - - if len(tcp_connect) != 0: - tcp = tcp_connect[0] - oo.tcp_success = tcp.success - oo.tcp_failure = tcp.failure - oo.tcp_t = tcp.t - - oo.handshake_failure = openvpn_h.failure - oo.handshake_t = openvpn_h.t - oo.handshake_t0 = openvpn_h.t0 - - # TODO(ain): condition to test version >= xyz - if len(network_events) != 0: - for evt in network_events: - if evt.packet is not None: - if evt.packet.opcode == "P_CONTROL_HARD_RESET_CLIENT_V2": - oo.openvpn_handshake_hr_client_t = evt.t - elif evt.packet.opcode == "P_CONTROL_HARD_RESET_SERVER_V2": - oo.openvpn_handshake_hr_server_t = evt.t - elif "client_hello" in evt.tags: - oo.openvpn_handshake_clt_hello_t = evt.t - elif "server_hello" in evt.tags: - oo.openvpn_handshake_srv_hello_t = evt.t - if evt.operation == "state" and evt.stage == "GOT_KEY": - oo.openvpn_handshake_got_keys__t = evt.t - if evt.operation == "state" and evt.stage == "GENERATED_KEYS": - oo.openvpn_handshake_gen_keys__t = evt.t - - oo.openvpn_handshake_key_exchg_n = count_key_exchange_packets(network_events) - - return oo - - class MeasurementTransformer: """ MeasurementTransformer is responsible for taking a measurement and @@ -1049,39 +984,5 @@ def consume_web_observations( return web_obs_list - def make_openvpn_observations(self, - tcp_observations: Optional[List[TCPConnect]], - openvpn_handshakes: Optional[List[OpenVPNHandshake]], - network_events: Optional[List[OpenVPNNetworkEvent]], - bootstrap_time: float, - ) -> List[OpenVPNObservation]: - """ - Returns a list of OpenVPNObservations by mapping all related - TCPObservations, OpenVPNNetworkevents and OpenVPNHandshakes. - """ - openvpn_obs_list: List[OpenVPNObservation] = [] - - for openvpn_handshake in openvpn_handshakes: - openvpn_obs_list.append( - measurement_to_openvpn_observation( - msmt_meta=self.measurement_meta, - probe_meta=self.probe_meta, - netinfodb=self.netinfodb, - tcp_connect=tcp_observations, - openvpn_h=openvpn_handshake, - network_events=network_events, - bootstrap_time=bootstrap_time, - ) - ) - - # TODO: can factor out function with web_observation - for idx, obs in enumerate(openvpn_obs_list): - obs.observation_id = f"{obs.measurement_meta.measurement_uid}_{idx}" - obs.created_at = datetime.now(timezone.utc).replace( - microsecond=0, tzinfo=None - ) - - return openvpn_obs_list - def make_observations(self, measurement): assert RuntimeError("make_observations is not implemented") diff --git a/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py b/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py index 9904c295..92644938 100644 --- a/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py +++ b/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py @@ -1,20 +1,111 @@ -from typing import List, Tuple +from datetime import datetime +from typing import Dict, List, Optional, Tuple +from oonidata.models.dataformats import OpenVPNHandshake, OpenVPNNetworkEvent from oonidata.models.nettests import OpenVPN -from oonidata.models.observations import OpenVPNObservation +from oonidata.models.observations import ( + OpenVPNObservation, + TunnelObservation, + WebObservation, +) -from ..measurement_transformer import MeasurementTransformer +from ..measurement_transformer import MeasurementTransformer, normalize_failure +from ..measurement_transformer import measurement_to_openvpn_observation + + +def count_key_exchange_packets(network_events: List[OpenVPNNetworkEvent]) -> int: + """ + return number of packets exchanged in the SENT_KEY state + """ + n = 0 + for evt in network_events: + if evt.stage == "SENT_KEY" and evt.operation.startswith("packet_"): + n += 1 + return n + + +def make_openvpn_timing_map( + network_events: List[OpenVPNNetworkEvent], +) -> Dict[str, float]: + + timings = {} + # TODO(ain): condition to test version >= xyz + if len(network_events) != 0: + for evt in network_events: + if evt.packet is not None: + if evt.packet.opcode == "P_CONTROL_HARD_RESET_CLIENT_V2": + timings["openvpn_handshake_hr_client"] = evt.t + elif evt.packet.opcode == "P_CONTROL_HARD_RESET_SERVER_V2": + timings["openvpn_handshake_hr_server"] = evt.t + elif evt.tags and "client_hello" in evt.tags: + timings["openvpn_handshake_clt_hello"] = evt.t + elif evt.tags and "server_hello" in evt.tags: + timings["openvpn_handshake_srv_hello"] = evt.t + if evt.operation == "state" and evt.stage == "GOT_KEY": + timings["openvpn_handshake_got_keys"] = evt.t + if evt.operation == "state" and evt.stage == "GENERATED_KEYS": + timings["openvpn_handshake_gen_keys"] = evt.t + + timings["openvpn_handshake_key_exchg_n"] = count_key_exchange_packets( + network_events + ) + + return timings class OpenVPNTransformer(MeasurementTransformer): - def make_observations(self, msmt: OpenVPN) -> Tuple[List[OpenVPNObservation]]: + + def make_observations( + self, msmt: OpenVPN + ) -> Tuple[List[OpenVPNObservation], List[WebObservation]]: if not msmt.test_keys: - return ([],) + return ([], []) - openvpn_obs = self.make_openvpn_observations( - tcp_observations=self.make_tcp_observations(msmt.test_keys.tcp_connect), - openvpn_handshakes=msmt.test_keys.openvpn_handshake, - network_events=msmt.test_keys.network_events, - bootstrap_time=msmt.test_keys.bootstrap_time, - ) + # def make_openvpn_observations( + # self, + # tcp_observations: Optional[List[TCPConnect]], + # openvpn_handshakes: List[OpenVPNHandshake], + # network_events: Optional[List[OpenVPNNetworkEvent]], + # bootstrap_time: float, + # ) -> List[TunnelObservation]: + # """ + # Returns a list of OpenVPNObservations by mapping all related + # TCPObservations, OpenVPNNetworkevents and OpenVPNHandshakes. + # """ + + openvpn_obs_list: List[TunnelObservation] = [] + + assert msmt.test_keys is not None + assert msmt.test_keys.openvpn_handshake is not None + idx = 1 + for hs in msmt.test_keys.openvpn_handshake: + to = TunnelObservation( + measurement_meta=self.measurement_meta, + probe_meta=self.probe_meta, + failure=normalize_failure(hs.failure), + success=hs.failure == None, + label="", + protocol="openvpn", + transport=hs.transport, + ip=hs.ip, + observation_idx=idx, + port=hs.port, + bootstrap_time=msmt.test_keys.bootstrap_time or -1, + ) - return (openvpn_obs, ) + to.timing_map = make_openvpn_timing_map(msmt.test_keys.network_events or []) + to.timing_map["handshake_t"] = hs.t + to.timing_map["handshake_t0"] = hs.t0 + to.failure_map["handshake"] = hs.failure or "" + idx += 1 + + openvpn_obs_list.append(to) + + web_observations = ( + self.consume_web_observations( + dns_observations=[], + tcp_observations=self.make_tcp_observations(msmt.test_keys.tcp_connect), + tls_observations=[], + http_observations=[], + ), + ) + return (openvpn_obs, web_observations) From 29289dbcc8ca8ffdf330ba7fa11c1a0920e414e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 3 Feb 2025 18:49:49 +0100 Subject: [PATCH 4/5] Fix broken tests --- oonidata/src/oonidata/models/observations.py | 53 ------------------- .../src/oonipipeline/db/create_tables.py | 4 +- .../transforms/nettests/openvpn.py | 22 ++++---- .../oonipipeline/transforms/observations.py | 15 ++++-- oonipipeline/tests/test_transforms.py | 32 +++++------ 5 files changed, 39 insertions(+), 87 deletions(-) diff --git a/oonidata/src/oonidata/models/observations.py b/oonidata/src/oonidata/models/observations.py index 836c2940..fbfacb8a 100644 --- a/oonidata/src/oonidata/models/observations.py +++ b/oonidata/src/oonidata/models/observations.py @@ -391,59 +391,6 @@ class HTTPMiddleboxObservation: hfm_success: Optional[bool] = None -@table_model( - table_name="obs_openvpn", - table_index=( - "measurement_start_time", - "measurement_uid", - "observation_idx", - ), -) -@dataclass -class OpenVPNObservation: - measurement_meta: MeasurementMeta - - probe_meta: ProbeMeta - - observation_idx: int = 0 - - created_at: Optional[datetime] = None - - timestamp: datetime = None - - # Fields added by the processor - - ip: str = "" - port: int = 0 - transport: str = "" - - success: bool = False - failure: Failure = None - - protocol: str = "" - variant: Optional[str] = None - - # TCP related observation - tcp_failure: Optional[Failure] = None - tcp_success: Optional[bool] = None - tcp_t: Optional[float] = None - - # OpenVPN handshake observation - openvpn_handshake_failure: Optional[Failure] = None - openvpn_handshake_t: Optional[float] = None - openvpn_handshake_t0: Optional[float] = None - openvpn_bootstrap_time: Optional[float] = None - - # timing info about the handshake packets - openvpn_handshake_hr_client_t: Optional[float] = None - openvpn_handshake_hr_server_t: Optional[float] = None - openvpn_handshake_clt_hello_t: Optional[float] = None - openvpn_handshake_srv_hello_t: Optional[float] = None - openvpn_handshake_key_exchg_n: Optional[int] = None - openvpn_handshake_got_keys__t: Optional[float] = None - openvpn_handshake_gen_keys__t: Optional[float] = None - - @table_model( table_name="obs_tunnel", table_index=("measurement_uid", "observation_idx", "measurement_start_time"), diff --git a/oonipipeline/src/oonipipeline/db/create_tables.py b/oonipipeline/src/oonipipeline/db/create_tables.py index 7c254e39..9fb7fb25 100644 --- a/oonipipeline/src/oonipipeline/db/create_tables.py +++ b/oonipipeline/src/oonipipeline/db/create_tables.py @@ -23,7 +23,7 @@ WebControlObservation, WebObservation, HTTPMiddleboxObservation, - OpenVPNObservation, + TunnelObservation, ) from .connections import ClickhouseConnection @@ -166,7 +166,7 @@ def format_create_query( table_models = [ WebObservation, WebControlObservation, - OpenVPNObservation, + TunnelObservation, HTTPMiddleboxObservation, ] diff --git a/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py b/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py index 92644938..c45a7c7d 100644 --- a/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py +++ b/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py @@ -3,13 +3,11 @@ from oonidata.models.dataformats import OpenVPNHandshake, OpenVPNNetworkEvent from oonidata.models.nettests import OpenVPN from oonidata.models.observations import ( - OpenVPNObservation, TunnelObservation, WebObservation, ) from ..measurement_transformer import MeasurementTransformer, normalize_failure -from ..measurement_transformer import measurement_to_openvpn_observation def count_key_exchange_packets(network_events: List[OpenVPNNetworkEvent]) -> int: @@ -56,7 +54,7 @@ class OpenVPNTransformer(MeasurementTransformer): def make_observations( self, msmt: OpenVPN - ) -> Tuple[List[OpenVPNObservation], List[WebObservation]]: + ) -> Tuple[List[TunnelObservation], List[WebObservation]]: if not msmt.test_keys: return ([], []) @@ -72,7 +70,7 @@ def make_observations( # TCPObservations, OpenVPNNetworkevents and OpenVPNHandshakes. # """ - openvpn_obs_list: List[TunnelObservation] = [] + tunnel_observations: List[TunnelObservation] = [] assert msmt.test_keys is not None assert msmt.test_keys.openvpn_handshake is not None @@ -98,14 +96,12 @@ def make_observations( to.failure_map["handshake"] = hs.failure or "" idx += 1 - openvpn_obs_list.append(to) + tunnel_observations.append(to) - web_observations = ( - self.consume_web_observations( - dns_observations=[], - tcp_observations=self.make_tcp_observations(msmt.test_keys.tcp_connect), - tls_observations=[], - http_observations=[], - ), + web_observations = self.consume_web_observations( + dns_observations=[], + tcp_observations=self.make_tcp_observations(msmt.test_keys.tcp_connect), + tls_observations=[], + http_observations=[], ) - return (openvpn_obs, web_observations) + return (tunnel_observations, web_observations) diff --git a/oonipipeline/src/oonipipeline/transforms/observations.py b/oonipipeline/src/oonipipeline/transforms/observations.py index 24660ab4..e3d8cf05 100644 --- a/oonipipeline/src/oonipipeline/transforms/observations.py +++ b/oonipipeline/src/oonipipeline/transforms/observations.py @@ -3,9 +3,9 @@ from oonidata.models.observations import ( HTTPMiddleboxObservation, + TunnelObservation, WebControlObservation, WebObservation, - OpenVPNObservation ) from oonidata.models.nettests import ( @@ -20,6 +20,7 @@ UrlGetter, WebConnectivity, HTTPInvalidRequestLine, + OpenVPN, ) from .nettests.dnscheck import DNSCheckTransformer @@ -63,8 +64,8 @@ TypeWebConnectivityObservations = Tuple[ List[WebObservation], List[WebControlObservation] ] +TypeTunnelObservations = Tuple[List[TunnelObservation], List[WebObservation]] TypeWebObservations = Tuple[List[WebObservation]] -TypeOpenVPNObservations = Tuple[List[OpenVPNObservation]] TypeHTTPMiddleboxObservations = Tuple[List[HTTPMiddleboxObservation]] @@ -94,6 +95,14 @@ def measurement_to_observations( ) -> TypeWebObservations: ... +@overload +def measurement_to_observations( + msmt: OpenVPN, + netinfodb: NetinfoDB, + bucket_date: str = "1984-01-01", +) -> TypeTunnelObservations: ... + + @overload def measurement_to_observations( msmt: SupportedDataformats, @@ -113,7 +122,7 @@ def measurement_to_observations( TypeWebObservations, TypeWebConnectivityObservations, TypeHTTPMiddleboxObservations, - TypeOpenVPNObservations, + TypeTunnelObservations, Tuple[()], ]: if msmt.test_name in NETTEST_TRANSFORMERS: diff --git a/oonipipeline/tests/test_transforms.py b/oonipipeline/tests/test_transforms.py index c5b92948..441734e0 100644 --- a/oonipipeline/tests/test_transforms.py +++ b/oonipipeline/tests/test_transforms.py @@ -12,7 +12,7 @@ from oonidata.models.nettests.urlgetter import UrlGetter from oonidata.models.nettests.browser_web import BrowserWeb from oonidata.models.nettests.openvpn import OpenVPN -from oonidata.models.observations import WebObservation +from oonidata.models.observations import WebObservation, TunnelObservation from oonipipeline.transforms.measurement_transformer import ( MeasurementTransformer, @@ -423,18 +423,18 @@ def test_openvpn_obs(netinfodb, measurements): obs_tup_udp = measurement_to_observations( msmt=msmt_udp, netinfodb=netinfodb, bucket_date=bucket_date ) - assert len(obs_tup_udp) == 1 - oou: OpenVPNObservation = obs_tup_udp[0][0] + assert len(obs_tup_udp) == 2 + oou: TunnelObservation = obs_tup_udp[0][0] assert oou.success is True assert oou.transport == "udp" assert oou.port == 1194 assert oou.ip == "37.218.243.98" - assert oou.tcp_success is None - assert oou.openvpn_handshake_srv_hello_t == 0.175448177 - assert oou.openvpn_handshake_got_keys__t == 0.305975312 - assert oou.openvpn_handshake_gen_keys__t == 0.376011823 - assert oou.openvpn_bootstrap_time==0.376279583 + # assert oou.tcp_success is None + assert oou.timing_map["openvpn_handshake_srv_hello"] == 0.175448177 + assert oou.timing_map["openvpn_handshake_got_keys"] == 0.305975312 + assert oou.timing_map["openvpn_handshake_gen_keys"] == 0.376011823 + assert oou.bootstrap_time == 0.376279583 msmt_tcp = load_measurement( msmt_path=measurements[ @@ -445,19 +445,19 @@ def test_openvpn_obs(netinfodb, measurements): obs_tup_tcp = measurement_to_observations( msmt=msmt_tcp, netinfodb=netinfodb, bucket_date=bucket_date ) - assert len(obs_tup_tcp) == 1 - oot: OpenVPNObservation = obs_tup_tcp[0][0] + assert len(obs_tup_tcp) == 2 + oot: TunnelObservation = obs_tup_tcp[0][0] assert oot.success is True assert oot.transport == "tcp" assert oot.port == 1194 assert oot.ip == "37.218.243.98" - assert oot.tcp_success is True - assert oot.tcp_t == 0.053010729 - assert oot.openvpn_handshake_hr_client_t==0.05684776 - assert oot.openvpn_handshake_srv_hello_t==0.204483958 - assert oot.openvpn_handshake_gen_keys__t==0.571443906 - assert oot.openvpn_bootstrap_time==0.571501093 + # assert oot.tcp_success is True + # assert oot.tcp_t == 0.053010729 + assert oot.timing_map["openvpn_handshake_hr_client"] == 0.05684776 + assert oot.timing_map["openvpn_handshake_srv_hello"] == 0.204483958 + assert oot.timing_map["openvpn_handshake_gen_keys"] == 0.571443906 + assert oot.bootstrap_time == 0.571501093 def test_echcheck_obs_tls_handshakes(netinfodb, measurements): msmt = load_measurement( From 20a320d8c3b110b46ff749eca9275ac03010aff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 3 Feb 2025 19:01:09 +0100 Subject: [PATCH 5/5] Update create_table to support Map(string, float) --- oonipipeline/src/oonipipeline/db/create_tables.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/oonipipeline/src/oonipipeline/db/create_tables.py b/oonipipeline/src/oonipipeline/db/create_tables.py index 9fb7fb25..3e3d6c1a 100644 --- a/oonipipeline/src/oonipipeline/db/create_tables.py +++ b/oonipipeline/src/oonipipeline/db/create_tables.py @@ -90,6 +90,9 @@ def typing_to_clickhouse(t: Any) -> str: if t in (Mapping[str, str], Dict[str, str]): return "Map(String, String)" + if t in (Mapping[str, float], Dict[str, float]): + return "Map(String, Float64)" + # TODO(art): eventually all the above types should be mapped using a similar pattern child_type, parent_type = typing.get_args(t) is_nullable = False