diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..42cd695 --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,22 @@ +name: Tests et couverture + +on: + push: + branches: [ main, develop ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + - name: Install dependencies + run: | + pip install -e .[dev] + - name: Run tests with coverage + run: pytest --cov=src --cov-report=term diff --git a/.gitignore b/.gitignore index c36d3c7..8319912 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ logs .venv *.egg-info *.parquet +!tests/data/sirene/*.parquet !code_officiel_geographique.parquet *.gz *.zip diff --git a/CHANGELOG.md b/CHANGELOG.md index 4baabff..7192a6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,23 @@ +### 2.6.0 2025-12-12 + +- Abandon des données consolidées par le MINEF, récupération des données à la source ([#151](https://github.com/ColinMaudry/decp-processing/issues/151)) + - Xmarchés + - AWS (officiel et legacy) + - PES marché (avant et après 2024) + - Dematis / e-marchespublics +- Ajout du champ `dureeRestanteMois` ([#135](https://github.com/ColinMaudry/decp-processing/issues/135)) +- Amélioration des noms des titulaires (personnes physiques et non-diffusibles) ([#145](https://github.com/ColinMaudry/decp-processing/issues/145)) +- Ajout de nombreux tests unitaires +- Amélioration de la gestion des modifications ([#148](https://github.com/ColinMaudry/decp-processing/issues/148)) +- Traitement des ressources en parallèle ([#113](https://github.com/ColinMaudry/decp-processing/issues/113)) +- Optimisation de la consommation de mémoire (matérialisation en parquet) ([#153](https://github.com/ColinMaudry/decp-processing/issues/153)) +- Résilience contre les erreurs pendant get_clean (seule la ressource échoue, pas tout le process) +- Mise en place d'un système de cache custom (parquet) +- Protection contre la publication par erreur sur data.gouv.fr ([ffaf0535](https://github.com/ColinMaudry/decp-processing/commit/ffaf0535)) +- Utilisation de polars 1.35.2 plutôt que 1.36.1 qui semble ne pas marcher avec polars-ds + ### 2.5.0 2025-11-21 -- Ajout de la durée restante dans le marché en mois ([#35](https://github.com/ColinMaudry/decp-processing/issues/135)) - Amélioration de la conso mémoire de la correction des titulaires ([#146](https://github.com/ColinMaudry/decp-processing/issues/146)) - Vérfication de la structude des données scrapées (AWS) - Gestion propre des erreurs 429 Too Many Redirects ([6fbd71e0](https://github.com/ColinMaudry/decp-processing/commit/6fbd71e0bca0534ee360dc172f3565607dac5bef)) diff --git a/README.md b/README.md index e5256d5..b3ad214 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # DECP processing -> version 2.5.0 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) +> version 2.6.0 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) Projet de traitement et de publication de meilleures données sur les marchés publics attribués en France. Vous pouvez consulter, filtrer et télécharger ces données sur le site [decp.info](https://decp.info). Enfin la section [À propos](https://decp.info/a-propos) décrit les objectifs du projet et regroupe toutes les informations clés. @@ -59,7 +59,7 @@ source .venv/bin/activate Installez les dépendances : ```bash -pip install . +pip install -e . ``` Pour les contributeurices, installez les dépendances de développement et l'auto-formatage au moment des commits : @@ -110,7 +110,7 @@ Déploiement des flows (exécution programmée de main ou ponctuelle de dev) : Le pré-traitement des données SIRENE doit être fait une fois pour que le traitement principal soit fonctionnel. ```bash -pytest tests/test_sirene_preprocess.py +python run_flow.py sirene_preprocess ``` Lancement du traitement principal (data_tables + decp.info) via un serveur prefect à usage unique diff --git a/data/source_datasets.json b/data/source_datasets.json index 42e24cc..8b77f8d 100644 --- a/data/source_datasets.json +++ b/data/source_datasets.json @@ -239,12 +239,6 @@ "owner_org_name": "ATEXO - DECP", "code": "atexo_mp_596280" }, - { - "id": "5cd57bf68b4c4179299eb0e9", - "name": "Données essentielles de la commande publique - fichiers consolidés", - "owner_org_name": "Ministère de l'économie, des Finances et de l'Industrie", - "code": "decp_minef" - }, { "id": "65f18014e9923e9e71f5f258", "name": "Données essentielles du profil acheteur Mégalis Bretagne - schéma arrêtés 2022", @@ -275,10 +269,46 @@ "code": "aws_marches-publics.info", "owner_org_name": "AWSolutions" }, + { + "id": "5cdb1722634f41416ffe90e2", + "name": "Données de marches-publics.info (AWS) - legacy", + "code": "aws_marches-publics.info_legacy", + "owner_org_name": "MINEF" + }, { "id": "68caf6b135f19236a4f37a32", "name": "Données de marches-publics.info (AWS) - scraping", "code": "scrap_marches-publics.info", "owner_org_name": "Colin Maudry" + }, + { + "id": "651eb149add2f419788c36ef", + "name": "Données essentielles des marchés publics - Xmarchés", + "code": "xmarches", + "owner_org_name": "SPL-XDEMAT" + }, + { + "id": "65cf3c55ec910b5a8e2fc084", + "name": "Données essentielles PES marché depuis 2024", + "code": "pes_marche_2024", + "owner_org_name": "MINEF" + }, + { + "id": "6932eab9f5ef60c27eba22ff", + "name": "Données essentielles PES marché avant 2024 (legacy)", + "code": "pes_marche_legacy", + "owner_org_name": "Colin Maudry" + }, + { + "id": "5c0a7845634f4139b2ee8883", + "name": "Données essentielles des marchés publics - e-marchespublics.com", + "code": "e-marchespublics.com_dematis", + "owner_org_name": "Dematis" + }, + { + "id": "6780e55f813a85efbd2c6557", + "name": "Les Personnes placées sous main de justice - IDF 2024", + "code": "ppsmj", + "owner_org_name": "Yael Siksik" } ] diff --git a/pyproject.toml b/pyproject.toml index d207460..73748ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,11 +9,12 @@ authors = [ dependencies = [ "python-dotenv", "pandas", # nécessaire pour l'écriture en base de données - "polars==1.33.1", + "polars==1.35.2", "pyarrow", "frictionless", "ipykernel", - "prefect==3.4.17", + "prefect==3.6.6", + "prefect[email]==3.6.6", "orjson", "ijson==3.4.0", "lxml", @@ -29,6 +30,7 @@ dev = [ "pre-commit", "pytest-env", "pytest", + "pytest-cov" ] [tool.pytest.ini_options] @@ -40,9 +42,10 @@ testpaths = [ ] env = [ "DATASETS_REFERENCE_FILEPATH=tests/data/source_datasets_test.json", + "SIRENE_DATA_DIR=tests/data/sirene", "PREFECT_API_URL=", "DECP_PROCESSING_PUBLISH=", - "PREFECT_TASKS_REFRESH_CACHE=true" + "DECP_USE_CACHE=false" ] addopts = "-p no:warnings" diff --git a/src/config.py b/src/config.py index 8d8eac2..c8a38ed 100644 --- a/src/config.py +++ b/src/config.py @@ -10,8 +10,6 @@ from dotenv import find_dotenv, load_dotenv from ijson import sendable_list -from src.schemas import SCHEMA_MARCHE_2019, SCHEMA_MARCHE_2022 - dotenv_path = find_dotenv() if dotenv_path == "": print("Création du fichier .env à partir de template.env") @@ -71,6 +69,9 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: # Clé d'API data.gouv.fr DATAGOUVFR_API_KEY = os.getenv("DATAGOUVFR_API_KEY", "") +# URL API Prefect +PREFECT_API_URL = os.getenv("PREFECT_API_URL") + # Dossier racine BASE_DIR = make_path_from_env("DECP_BASE_DIR", Path(__file__).absolute().parent.parent) print(f"{'BASE_DIR':<40}", BASE_DIR) @@ -80,6 +81,9 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: DATA_DIR.mkdir(exist_ok=True, parents=True) print(f"{'DATA_DIR':<40}", DATA_DIR) +RESOURCE_CACHE_DIR = DATA_DIR / "resource_cache" +RESOURCE_CACHE_DIR.mkdir(exist_ok=True, parents=True) + DIST_DIR = make_path_from_env("DECP_DIST_DIR", BASE_DIR / "dist") DIST_DIR.mkdir(exist_ok=True, parents=True, mode=777) print(f"{'DIST_DIR':<40}", DIST_DIR) @@ -97,7 +101,15 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: SIRENE_DATA_PARENT_DIR = make_path_from_env("SIRENE_DATA_PARENT_DIR", DATA_DIR) -SIRENE_DATA_DIR = make_sirene_data_dir(SIRENE_DATA_PARENT_DIR) + +# SIRENE_DATA_DIR ne doit être spécifié que pour les tests. Laisser vide dans .env et laisser make_sirene_data_dir +# le déterminer +SIRENE_DATA_DIR = os.getenv( + "SIRENE_DATA_DIR", make_sirene_data_dir(SIRENE_DATA_PARENT_DIR) +) +if isinstance(SIRENE_DATA_DIR, str): + SIRENE_DATA_DIR = Path(os.path.join(BASE_DIR, SIRENE_DATA_DIR)) + # SIRENE_DATA_DIR on ne le crée que si nécessaire, dans flows.py print(f"{'SIRENE_DATA_PARENT_DIR':<40}", SIRENE_DATA_PARENT_DIR) print(f"{'SIRENE_DATA_DIR':<40}", SIRENE_DATA_DIR) @@ -113,6 +125,8 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: SCRAPING_TARGET = os.getenv("SCRAPING_TARGET") print(f"{'SCRAPING_TARGET':<40}", SCRAPING_TARGET) +# Lecture ou non des ressource en cache +DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true" # Dossier de stockage des résultats de tâches et du cache # https://docs.prefect.io/v3/advanced/results#default-persistence-configuration @@ -204,8 +218,4 @@ class DecpFormat: coroutine_ijson: Coroutine | None = None -DECP_FORMAT_2019 = DecpFormat("DECP 2019", SCHEMA_MARCHE_2019, "marches") -DECP_FORMAT_2022 = DecpFormat("DECP 2022", SCHEMA_MARCHE_2022, "marches.marche") -DECP_FORMATS = [DECP_FORMAT_2019, DECP_FORMAT_2022] - print("") diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index 39862c4..56db3dd 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -5,8 +5,11 @@ import polars.selectors as cs from prefect import flow from prefect.artifacts import create_table_artifact +from prefect.context import get_run_context from prefect.task_runners import ConcurrentTaskRunner +from prefect_email import EmailServerCredentials, email_send_message +from config import PREFECT_API_URL from src.config import ( BASE_DF_COLUMNS, BASE_DIR, @@ -14,6 +17,7 @@ DECP_PROCESSING_PUBLISH, DIST_DIR, MAX_PREFECT_WORKERS, + RESOURCE_CACHE_DIR, SIRENE_DATA_DIR, TRACKED_DATASETS, ) @@ -21,22 +25,22 @@ from src.tasks.dataset_utils import list_resources from src.tasks.enrich import enrich_from_sirene from src.tasks.get import get_clean -from src.tasks.output import generate_final_schema, save_to_files +from src.tasks.output import generate_final_schema, sink_to_files from src.tasks.publish import publish_to_datagouv from src.tasks.transform import ( add_duree_restante, calculate_naf_cpv_matching, - concat_decp_json, + concat_parquet_files, sort_columns, ) -from src.tasks.utils import generate_stats, remove_unused_cache +from src.tasks.utils import full_resource_name, generate_stats, remove_unused_cache @flow( log_prints=True, task_runner=ConcurrentTaskRunner(max_workers=MAX_PREFECT_WORKERS), ) -def decp_processing(enable_cache_removal: bool = False): +def decp_processing(enable_cache_removal: bool = True): print(f"🚀 Début du flow decp-processing dans base dir {BASE_DIR} ") print("Liste de toutes les ressources des datasets...") @@ -45,15 +49,48 @@ def decp_processing(enable_cache_removal: bool = False): # Initialisation du tableau des artifacts de ressources resources_artifact = [] - # Traitement parallèle des ressources - futures = [ - get_clean.submit(resource, resources_artifact) - for resource in resources - if resource["filesize"] > 100 - ] - dfs: list[pl.DataFrame] = [f.result() for f in futures if f.result() is not None] + # Liste des ressources en cache (checksums) + available_parquet_files = set(os.listdir(RESOURCE_CACHE_DIR)) - if DECP_PROCESSING_PUBLISH: + # Traitement parallèle des ressources par lots pour éviter la surcharge mémoire + batch_size = 500 + parquet_files = [] + + # Filtrer les ressources à traiter + resources_to_process = [r for r in resources if r["filesize"] > 100] + + # Afin d'être sûr que je ne publie pas par erreur un jeu de données de test + decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000 + + for i in range(0, len(resources_to_process), batch_size): + batch = resources_to_process[i : i + batch_size] + print( + f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}" + ) + + futures = {} + for resource in batch: + future = get_clean.submit( + resource, resources_artifact, available_parquet_files + ) + futures[future] = full_resource_name(resource) + + for f in futures: + try: + result = f.result() + if result is not None: + parquet_files.append(result) + except Exception as e: + resource_name = futures[f] + print( + f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):" + ) + print(e) + + # Nettoyage explicite + futures.clear() + + if decp_publish: create_table_artifact( table=resources_artifact, key="datagouvfr-json-resources", @@ -62,54 +99,74 @@ def decp_processing(enable_cache_removal: bool = False): del resources_artifact print("Fusion des dataframes...") - df: pl.DataFrame = concat_decp_json(dfs) - del dfs + lf: pl.LazyFrame = concat_parquet_files(parquet_files) print("Ajout des données SIRENE...") # Preprocessing des données SIRENE si : # - le dossier n'existe pas encore (= les données n'ont pas déjà été preprocessed ce mois-ci) # - on est au moins le 5 du mois (pour être sûr que les données SIRENE ont été mises à jour sur data.gouv.fr) - print(SIRENE_DATA_DIR) if not SIRENE_DATA_DIR.exists(): sirene_preprocess() - lf: pl.LazyFrame = enrich_from_sirene(df.lazy()) - - print("Ajout de la colonne 'dureeRestanteMois'...") - lf = add_duree_restante(lf) - - df: pl.DataFrame = lf.collect(engine="streaming") + lf: pl.LazyFrame = enrich_from_sirene(lf) # Réinitialisation de DIST_DIR if os.path.exists(DIST_DIR): shutil.rmtree(DIST_DIR) os.makedirs(DIST_DIR) + sink_to_files(lf, DIST_DIR / "decp", file_format="parquet") + lf: pl.LazyFrame = pl.scan_parquet(DIST_DIR / "decp.parquet") + + print("Ajout de la colonne 'dureeRestanteMois'...") + lf = add_duree_restante(lf) + print("Génération des probabilités NAF/CPV...") - calculate_naf_cpv_matching(df) - df = df.drop(cs.starts_with("activite")) + calculate_naf_cpv_matching(lf) + lf = lf.drop(cs.starts_with("activite")) print("Génération de l'artefact (statistiques) sur le base df...") - generate_stats(df) + generate_stats(lf) print("Génération du schéma et enregistrement des DECP aux formats CSV, Parquet...") - df: pl.DataFrame = sort_columns(df, BASE_DF_COLUMNS) - generate_final_schema(df) - save_to_files(df, DIST_DIR / "decp") - del df + lf: pl.LazyFrame = sort_columns(lf, BASE_DF_COLUMNS) + generate_final_schema(lf) + sink_to_files(lf, DIST_DIR / "decp") # Base de données SQLite dédiée aux activités du Datalab d'Anticor # Désactivé pour l'instant https://github.com/ColinMaudry/decp-processing/issues/124 # make_data_tables() - if DECP_PROCESSING_PUBLISH: + if decp_publish: print("Publication sur data.gouv.fr...") publish_to_datagouv() else: print("Publication sur data.gouv.fr désactivée.") - # Suppression des fichiers de cache inutilisés if enable_cache_removal: + print("Suppression des fichiers de cache inutilisés...") remove_unused_cache() print("☑️ Fin du flow principal decp_processing.") + + +@sirene_preprocess.on_failure +@decp_processing.on_failure +def notify_exception_by_email(flow, flow_run, state): + if PREFECT_API_URL: + context = get_run_context() + flow_run_name = context.flow_run.name + email_server_credentials = EmailServerCredentials.load("email-notifier") + message = ( + f"Your job {flow_run.name} entered {state.name} " + f"with message:\n\n" + f"See \n\n" + f"Scheduled start: {flow_run.expected_start_time}" + ) + + email_send_message( + email_server_credentials=email_server_credentials, + subject=f"Flow run {flow_run_name!r} failed", + msg=message, + email_to=email_server_credentials.username, + ) diff --git a/src/flows/sirene_preprocess.py b/src/flows/sirene_preprocess.py index c618c9f..9b12094 100644 --- a/src/flows/sirene_preprocess.py +++ b/src/flows/sirene_preprocess.py @@ -3,8 +3,8 @@ from src.config import SIRENE_DATA_DIR from src.flows.get_cog import get_cog -from src.tasks.get import get_etablissements -from src.tasks.transform import get_prepare_unites_legales, prepare_etablissements +from src.tasks.get import get_etablissements, get_unite_legales +from src.tasks.transform import prepare_etablissements from src.tasks.utils import create_sirene_data_dir @@ -26,13 +26,17 @@ def sirene_preprocess(): processed_ul_parquet_path = SIRENE_DATA_DIR / "unites_legales.parquet" if not processed_ul_parquet_path.exists(): print("Téléchargement et préparation des unités légales...") - get_prepare_unites_legales(processed_ul_parquet_path) + get_unite_legales(processed_ul_parquet_path) + else: + print(processed_ul_parquet_path, " existe, skipping.") # préparer les données établissements processed_etab_parquet_path = SIRENE_DATA_DIR / "etablissements.parquet" if not processed_etab_parquet_path.exists(): print("Téléchargement et préparation des établissements...") lf = get_etablissements() - prepare_etablissements(lf, processed_etab_parquet_path) + prepare_etablissements(lf).sink_parquet(processed_etab_parquet_path) + else: + print(processed_etab_parquet_path, " existe, skipping.") print("☑️ Fin du flow sirene_preprocess.") diff --git a/src/tasks/clean.py b/src/tasks/clean.py index 89177cb..54a567a 100644 --- a/src/tasks/clean.py +++ b/src/tasks/clean.py @@ -2,16 +2,23 @@ import re import polars as pl +from polars import selectors as cs from src.config import DecpFormat from src.tasks.transform import ( explode_titulaires, process_modifications, - process_string_lists, ) def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: + """ + The bulk of Polars data cleaning is grouped here, with the exception of process_modifications and explode_titulaires that are not + cleaning tasks. + :param lf: + :param decp_format: + :return: + """ # # CLEAN DATA # @@ -41,11 +48,16 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: # TODO: à déplacer autre part, dans transform lf = lf.with_columns((pl.col("acheteur_id") + pl.col("id")).alias("uid")) + # Application des modifications + # le plus tôt possible pour que les fonctions suivantes clean les + # champs modifiés (dateNotification, datePublicationDonnnes, montant, titulaires, dureeMois) + lf = process_modifications(lf) + # Montants # Certains marchés ont des montants qui posent problème, donc on les met à 12,311111111 milliards (pour les retrouver facilement) # ex : 221300015002472020F00075, 1.0E17 lf = lf.with_columns( - pl.when(pl.col("montant").str.len_chars() > 11) + pl.when(pl.col("montant").str.split(".").list.get(0).str.len_chars() > 11) .then(pl.lit("12311111111")) .otherwise(pl.col("montant")) .alias("montant") @@ -72,13 +84,22 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: pl.col(["datePublicationDonnees", "dateNotification"]) .str.replace_many(date_replacements) .cast(pl.Utf8) + .name.keep() + ) + + # suppression des suffixes de fuseau horaire + lf = lf.with_columns( + pl.col(["datePublicationDonnees", "dateNotification"]) + .str.split("+") + .list[0] + .name.keep() ) # Nature lf = lf.with_columns( - pl.col("nature").str.replace_many( - {"Marche": "Marché", "subsequent": "subséquent"} - ) + pl.col("nature") + .str.replace_many({"Marche": "Marché", "subsequent": "subséquent"}) + .alias("nature") ) # Codes CPV, suppression du caractères de contrôle ("-[0-9]$") @@ -93,9 +114,6 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: # Normalisation des titulaires lf = clean_titulaires(lf, decp_format) - # Application des modifications - lf = process_modifications(lf) - # Explosion des titulaires lf = explode_titulaires(lf, decp_format) @@ -195,9 +213,11 @@ def clean_titulaires(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: titulaire_typeIdentifiant=pl.element().struct.field("typeIdentifiant"), ) - for col in ["titulaires", "modification_titulaires"]: + # Skip processing if column dtype is Null (all values are null) + # This happens when modification_titulaires has no actual data + if lf.collect_schema()["titulaires"] != pl.Null: lf = lf.with_columns( - pl.col(col) + pl.col("titulaires") .list.eval(expr_titulaire) .list.eval( # Filtrer les éléments où id ET typeIdentifiant sont null @@ -208,24 +228,32 @@ def clean_titulaires(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: .is_not_null() ) ) - .alias(col) + .alias("titulaires") ) # Remplacer les listes de titulaires vides par null - lf = lf.with_columns( - [ - pl.when(pl.col(col).list.len() == 0) - .then(None) - .otherwise(pl.col(col)) - .alias(col) - for col in ["titulaires", "modification_titulaires"] - ] - ) + # Only process columns that have List dtype + + if lf.collect_schema()["titulaires"] != pl.Null: + lf = lf.with_columns( + [ + pl.when(pl.col("titulaires").list.len() == 0) + .then(None) + .otherwise(pl.col("titulaires")) + .alias("titulaires") + ] + ) return lf def fix_data_types(lf: pl.LazyFrame) -> pl.LazyFrame: + """ + To enable easier data ingestion, everything is initially cast as strings... until here. This + function casts the right datatype for each column. + :param lf: + :return: + """ numeric_dtypes = { "dureeMois": pl.Int16, # "dureeMoisActeSousTraitance": pl.Int16, @@ -234,10 +262,10 @@ def fix_data_types(lf: pl.LazyFrame) -> pl.LazyFrame: "montant": pl.Float64, # "montantActeSousTraitance": pl.Float64, # "montantModificationActeSousTraitance": pl.Float64, - "tauxAvance": pl.Float64, + "tauxAvance": pl.Float32, # "variationPrixActeSousTraitance": pl.Float64, - "origineFrance": pl.Float64, - "origineUE": pl.Float64, + "origineFrance": pl.Float32, + "origineUE": pl.Float32, "modification_id": pl.Int16, } @@ -287,3 +315,33 @@ def fix_data_types(lf: pl.LazyFrame) -> pl.LazyFrame: ] ) return lf + + +def process_string_lists(lf: pl.LazyFrame): + string_lists_col_to_rename = [ + "considerationsSociales_considerationSociale", + "considerationsEnvironnementales_considerationEnvironnementale", + "techniques_technique", + "typesPrix_typePrix", + "modalitesExecution_modaliteExecution", + ] + columns = lf.collect_schema().names() + + # Pour s'assurer qu'on renomme pas une colonne si le bon nom de colonne existe déjà + for bad_col in string_lists_col_to_rename: + new_col = bad_col.split("_")[0] + if new_col in columns and bad_col in columns: + lf = lf.with_columns( + pl.when(pl.col(new_col).list.len() == 0) + .then(pl.col(bad_col)) + .otherwise(pl.col(new_col)) + .alias(new_col) + ) + lf = lf.drop(bad_col) + elif new_col not in columns and bad_col in columns: + lf = lf.rename({bad_col: new_col}) + + # Et on remplace la liste Python par une liste séparée par des virgules + lf = lf.with_columns(cs.by_dtype(pl.List(pl.String)).list.join(", ").name.keep()) + + return lf diff --git a/src/tasks/enrich.py b/src/tasks/enrich.py index dfd63c9..9fad829 100644 --- a/src/tasks/enrich.py +++ b/src/tasks/enrich.py @@ -21,6 +21,27 @@ def add_etablissement_data( lf_sirets = lf_sirets.join( lf_etablissements, how="inner", left_on=siret_column, right_on="siret" ) + + # On ne prend pas l'activité des acheteurs + if type_siret == "acheteur": + lf_sirets = lf_sirets.drop(cs.starts_with("activite_")) + + # Si il y a un etablissement_nom (Enseigne1Etablissement ou denominationUsuelleEtablissement), + # on l'ajoute au nom de l'organisme, entre parenthèses + lf_sirets = lf_sirets.with_columns( + pl.when(pl.col("etablissement_nom").is_not_null()) + .then( + pl.concat_str( + pl.col(f"{type_siret}_nom"), + pl.lit(" ("), + pl.col("etablissement_nom"), + pl.lit(")"), + ) + ) + .otherwise(pl.col(f"{type_siret}_nom")) + .alias(f"{type_siret}_nom") + ).drop("etablissement_nom") + lf_sirets = lf_sirets.rename( { "latitude": f"{type_siret}_latitude", @@ -61,15 +82,12 @@ def enrich_from_sirene(lf: pl.LazyFrame): lf_etablissements = pl.scan_parquet(SIRENE_DATA_DIR / "etablissements.parquet") lf_unites_legales = pl.scan_parquet(SIRENE_DATA_DIR / "unites_legales.parquet") + lf_base = lf.clone() + # DONNÉES SIRENE ACHETEURS print("Extraction des SIRET des acheteurs...") - lf_sirets_acheteurs = extract_unique_acheteurs_siret(lf.clone()) - - print("Ajout des données établissements (acheteurs)...") - lf_sirets_acheteurs = add_etablissement_data( - lf_sirets_acheteurs, lf_etablissements, "acheteur_id", "acheteur" - ) + lf_sirets_acheteurs = extract_unique_acheteurs_siret(lf_base) print("Ajout des données unités légales (acheteurs)...") lf_sirets_acheteurs = add_unite_legale_data( @@ -79,22 +97,21 @@ def enrich_from_sirene(lf: pl.LazyFrame): type_siret="acheteur", ) - lf = lf.join(lf_sirets_acheteurs, how="left", on="acheteur_id") - - del lf_sirets_acheteurs + print("Ajout des données établissements (acheteurs)...") + lf_sirets_acheteurs = add_etablissement_data( + lf_sirets_acheteurs, lf_etablissements, "acheteur_id", "acheteur" + ) - # print("Construction du champ acheteur_nom à partir des données SIRENE...") - # lf_sirets_acheteurs = make_acheteur_nom(lf_sirets_acheteurs) + # Matérialisation de sirets_acheteurs pour rompre + # le cercle de dépendances du lf + acheteurs_path = SIRENE_DATA_DIR / "temp_enrich_acheteurs.parquet" + lf_sirets_acheteurs.sink_parquet(acheteurs_path) + lf_sirets_acheteurs = pl.scan_parquet(acheteurs_path) # DONNÉES SIRENE TITULAIRES print("Extraction des SIRET des titulaires...") - lf_sirets_titulaires = extract_unique_titulaires_siret(lf.clone()) - - print("Ajout des données établissements (titulaires)...") - lf_sirets_titulaires = add_etablissement_data( - lf_sirets_titulaires, lf_etablissements, "titulaire_id", "titulaire" - ) + lf_sirets_titulaires = extract_unique_titulaires_siret(lf_base) print("Ajout des données unités légales (titulaires)...") lf_sirets_titulaires = add_unite_legale_data( @@ -104,6 +121,21 @@ def enrich_from_sirene(lf: pl.LazyFrame): type_siret="titulaire", ) + print("Ajout des données établissements (titulaires)...") + lf_sirets_titulaires = add_etablissement_data( + lf_sirets_titulaires, lf_etablissements, "titulaire_id", "titulaire" + ) + + # # Matérialisation de sirets_titulaires pour rompre + # le cercle de dépendances du lf + titulaires_path = SIRENE_DATA_DIR / "temp_enrich_titulaires.parquet" + lf_sirets_titulaires.sink_parquet(titulaires_path) + lf_sirets_titulaires = pl.scan_parquet(titulaires_path) + + # JOINTURES + + lf = lf.join(lf_sirets_acheteurs, how="left", on="acheteur_id") + # En joignant en utilisant à la fois le SIRET et le typeIdentifiant, on s'assure qu'on ne joint pas sur # des id de titulaires non-SIRET lf = lf.join( diff --git a/src/tasks/get.py b/src/tasks/get.py index 954dd7e..e32c60d 100644 --- a/src/tasks/get.py +++ b/src/tasks/get.py @@ -1,5 +1,4 @@ import concurrent.futures -import datetime import tempfile from collections.abc import Iterator from functools import partial @@ -15,16 +14,17 @@ from prefect import task from prefect.transactions import transaction +from config import SIRENE_UNITES_LEGALES_URL from src.config import ( - CACHE_EXPIRATION_TIME_HOURS, - DECP_FORMAT_2022, - DECP_FORMATS, + DATA_DIR, DECP_PROCESSING_PUBLISH, - DIST_DIR, + DECP_USE_CACHE, HTTP_CLIENT, HTTP_HEADERS, + RESOURCE_CACHE_DIR, DecpFormat, ) +from src.schemas import SCHEMA_MARCHE_2019, SCHEMA_MARCHE_2022 from src.tasks.clean import ( clean_decp, clean_invalid_characters, @@ -32,10 +32,11 @@ ) from src.tasks.output import sink_to_files from src.tasks.utils import ( + full_resource_name, gen_artifact_row, - get_clean_cache_key, stream_replace_bytestring, ) +from tasks.transform import prepare_unites_legales @task(retries=3, retry_delay_seconds=3) @@ -61,16 +62,15 @@ def stream_get(url: str, chunk_size=1024**2): # chunk_size en octets (1 Mo par def get_resource( r: dict, resources_artifact: list[dict] | list ) -> tuple[pl.LazyFrame | None, DecpFormat | None]: - decp_formats: list[DecpFormat] = DECP_FORMATS + if DECP_PROCESSING_PUBLISH is False: + print(f"➡️ {full_resource_name(r)}") - print(f"➡️ {r['ori_filename']} ({r['dataset_name']})") - - output_path = DIST_DIR / "get" / r["filename"] - output_path.parent.mkdir(exist_ok=True) + output_path = DATA_DIR / "get" / r["filename"] + output_path.parent.mkdir(exist_ok=True, parents=True) url = r["url"] file_format = r["format"] if file_format == "json": - fields, decp_format = json_stream_to_parquet(url, output_path, decp_formats) + fields, decp_format = json_stream_to_parquet(url, output_path, r) elif file_format == "xml": try: fields, decp_format = xml_stream_to_parquet( @@ -80,11 +80,9 @@ def get_resource( fields, decp_format = xml_stream_to_parquet( url, output_path, fix_chars=True ) - print(f"♻️ {r['ori_filename']} nettoyé et traité") + print(f"♻️ {full_resource_name(r)} nettoyé et traité") else: - print( - f"▶️ Format de fichier non supporté : {file_format} ({r['dataset_name']})" - ) + print(f"▶️ Format de fichier non supporté : {full_resource_name(r)}") return None, None if decp_format is None: @@ -117,23 +115,26 @@ def get_resource( return lf, decp_format -def find_json_decp_format(chunk, decp_formats): +def find_json_decp_format(chunk, decp_formats, resource: dict): for decp_format in decp_formats: decp_format.coroutine_ijson.send(chunk) if len(decp_format.liste_marches_ijson) > 0: # Le parser a trouvé au moins un marché correspondant à ce format, donc on a # trouvé le bon format. return decp_format - print("⚠️ Pas de match trouvé parmis les schémas passés") + print( + f"⚠️ Pas de match trouvé parmis les schémas passés : {full_resource_name(resource)}" + ) return None @task(persist_result=False, log_prints=True) def json_stream_to_parquet( - url: str, output_path: Path, decp_formats: list[DecpFormat] | None = None + url: str, output_path: Path, resource: dict ) -> tuple[set, DecpFormat or None]: - if decp_formats is None: - decp_formats: list[DecpFormat] = DECP_FORMATS + decp_format_2019 = DecpFormat("DECP 2019", SCHEMA_MARCHE_2019, "marches") + decp_format_2022 = DecpFormat("DECP 2022", SCHEMA_MARCHE_2022, "marches.marche") + decp_formats = [decp_format_2019, decp_format_2022] fields = set() for decp_format in decp_formats: @@ -172,7 +173,7 @@ def json_stream_to_parquet( print(f"⚠️ Flux vide pour {url}") return set(), None - decp_format = find_json_decp_format(chunk, decp_formats) + decp_format = find_json_decp_format(chunk, decp_formats, resource) if decp_format is None: return set(), None @@ -207,6 +208,8 @@ def xml_stream_to_parquet( ) -> tuple[set, DecpFormat]: """Uniquement utilisé pour les données publiées par l'AIFE.""" + decp_format_2022 = DecpFormat("DECP 2022", SCHEMA_MARCHE_2022, "marches.marche") + fields = set() parser = etree.XMLPullParser(tag="marche", recover=True) with tempfile.NamedTemporaryFile( @@ -218,11 +221,11 @@ def xml_stream_to_parquet( parser.feed(chunk) for _, elem in parser.read_events(): marche = parse_element(elem) - new_fields = write_marche_rows(marche, tmp_file, DECP_FORMAT_2022) + new_fields = write_marche_rows(marche, tmp_file, decp_format_2022) fields = fields.union(new_fields) - lf = pl.scan_ndjson(tmp_file.name, schema=DECP_FORMAT_2022.schema) + lf = pl.scan_ndjson(tmp_file.name, schema=decp_format_2022.schema) sink_to_files(lf, output_path, file_format="parquet") - return fields, DECP_FORMAT_2022 + return fields, decp_format_2022 # Générée par la LLM Euria, développée par Infomaniak @@ -315,8 +318,8 @@ def yield_modifications(row: dict, separator="_") -> Iterator[dict] or None: # directement à un dict contenant les métadonnées liées à une modification. if isinstance(raw_mods, dict): raw_mods = [raw_mods] - - raw_mods = [] if raw_mods is None else raw_mods + elif isinstance(raw_mods, str) or raw_mods is None: + raw_mods = [] mods = [{}] + raw_mods for i, mod in enumerate(mods): @@ -368,6 +371,8 @@ def get_etablissements() -> pl.LazyFrame: "longitude": pl.Float64, "activitePrincipaleEtablissement": pl.String, "nomenclatureActivitePrincipaleEtablissement": pl.String, + "enseigne1Etablissement": pl.String, + "denominationUsuelleEtablissement": pl.String, } columns = list(schema.keys()) @@ -386,34 +391,28 @@ def get_etablissements() -> pl.LazyFrame: hrefs.append(base_url + href) # Fonction de traitement pour un fichier - def process_file(_href: str): + def get_process_file(_href: str): print(_href.split("/")[-1]) try: response = http_client.get( - _href, headers=HTTP_HEADERS, timeout=10 + _href, headers=HTTP_HEADERS, timeout=20 ).raise_for_status() except (HTTPStatusError, TimeoutException) as err: print(err) print("Nouvel essai...") response = http_client.get( - _href, headers=HTTP_HEADERS, timeout=10 + _href, headers=HTTP_HEADERS, timeout=20 ).raise_for_status() content = response.content lff = pl.scan_csv(content, schema_overrides=schema) lff = lff.select(columns) - lff = lff.with_columns( - [ - pl.col("codeCommuneEtablissement").str.pad_start(5, "0"), - pl.col("siret").str.pad_start(14, "0"), - ] - ) return lff # Traitement en parrallèle avec 8 threads lfs = [] with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: - futures = [executor.submit(process_file, href) for href in hrefs] + futures = [executor.submit(get_process_file, href) for href in hrefs] for future in concurrent.futures.as_completed(futures): try: lf = future.result() @@ -440,21 +439,46 @@ def get_insee_cog_data(url, schema_overrides, columns) -> pl.DataFrame: @task( log_prints=True, - persist_result=True, - cache_expiration=datetime.timedelta(hours=CACHE_EXPIRATION_TIME_HOURS), - cache_key_fn=get_clean_cache_key, + # persist_result=True, + # cache_expiration=datetime.timedelta(hours=CACHE_EXPIRATION_TIME_HOURS), + # cache_key_fn=get_clean_cache_key, ) -def get_clean(resource, resources_artifact: list) -> pl.DataFrame or None: - # Récupération des données source... +def get_clean( + resource, resources_artifact: list, available_parquet_files: set +) -> pl.DataFrame or None: with transaction(): - lf, decp_format = get_resource(resource, resources_artifact) - - # Nettoyage des données source et typage des colonnes... - # si la ressource est dans un format supporté - if lf is not None: - lf = clean_decp(lf, decp_format) - df = lf.collect(engine="streaming") + checksum = resource["checksum"] + parquet_path = RESOURCE_CACHE_DIR / f"{checksum}" + + # Si la ressource n'est pas en cache ou que l'utilisation du cache est désactivée + if ( + DECP_USE_CACHE is False + or f"{checksum}.parquet" not in available_parquet_files + ): + # Récupération des données source... + lf, decp_format = get_resource(resource, resources_artifact) + + # Nettoyage des données source et typage des colonnes... + # si la ressource est dans un format supporté + if lf is not None and not Path(parquet_path).exists(): + lf: pl.LazyFrame = clean_decp(lf, decp_format) + sink_to_files( + lf, parquet_path, file_format="parquet", compression="zstd" + ) + return parquet_path.with_suffix(".parquet") + else: + return None else: - df = None - - return df + # Le fichier parquet est déjà disponible pour ce checksum + print(f"👍 Ressource déjà en cache : {resource['dataset_code']}") + return parquet_path.with_suffix(".parquet") + + +@task +def get_unite_legales(processed_parquet_path): + print("Téléchargement des données unité légales et sélection des colonnes...") + ( + pl.scan_parquet(SIRENE_UNITES_LEGALES_URL) + .pipe(prepare_unites_legales) + .sink_parquet(processed_parquet_path) + ) diff --git a/src/tasks/output.py b/src/tasks/output.py index 13351a2..4e22f9d 100644 --- a/src/tasks/output.py +++ b/src/tasks/output.py @@ -23,13 +23,34 @@ def save_to_files(df: pl.DataFrame, path: Path, file_format=None): df.write_csv(f"{path}.csv") -def sink_to_files(lf: pl.LazyFrame, path: str | Path, file_format=None): +def sink_to_files( + lf: pl.LazyFrame, path: str | Path, file_format=None, compression: str = "zstd" +): + path = Path(path) if file_format is None: file_format = ["csv", "parquet"] + if "parquet" in file_format: - lf.sink_parquet(f"{path}.parquet") - if "csv" in file_format: - lf.sink_csv(f"{path}.csv") + # Write to a temporary file first to avoid read-write conflicts + tmp_path = path.with_suffix(".parquet.tmp") + + lf.sink_parquet(tmp_path, compression=compression, engine="streaming") + + if tmp_path.exists(): + tmp_path.rename(path.with_suffix(".parquet")) + + # Utilisation du parquet plutôt que de relaculer le plan de requête du LazyFrame + if "csv" in file_format: + pl.scan_parquet(f"{path}.parquet").sink_csv( + f"{path}.csv", engine="streaming" + ) + + elif "csv" in file_format: + # Fallback if only CSV is requested + lf.sink_csv(f"{path}.csv", engine="streaming") + + # Si ça peut réduire un peu l'empreinte mémoire + del lf def save_to_postgres(df: pl.DataFrame, table_name: str): @@ -108,12 +129,15 @@ def save_to_databases( # save_to_postgres(df, table_name) -def generate_final_schema(df): +def generate_final_schema(lf): """Création d'un TableSchema pour décrire les données publiées""" - df_schema = [] + + schema = lf.collect_schema() + frictonless_schema = [] polars_frictionless_mapping = { "String": "string", + "Float32": "number", "Float64": "number", "Int16": "integer", "Boolean": "boolean", @@ -121,9 +145,9 @@ def generate_final_schema(df): } # conversion en dict sérialisable en JSON - for col in df.columns: - polars_type = df.schema[col].__str__() - df_schema.append( + for col in schema.keys(): + polars_type = schema[col].__str__() + frictonless_schema.append( {"name": col, "type": polars_frictionless_mapping[polars_type]} ) @@ -134,7 +158,7 @@ def generate_final_schema(df): # fusion des deux # https://www.paigeniedringhaus.com/blog/filter-merge-and-update-python-lists-based-on-object-attributes#merge-two-lists-together-by-matching-object-keys merged_fields = groupby( - sorted(base_json["fields"] + df_schema, key=itemgetter("name")), + sorted(base_json["fields"] + frictonless_schema, key=itemgetter("name")), itemgetter("name"), ) merged_schema = {"fields": [dict(ChainMap(*g)) for k, g in merged_fields]} diff --git a/src/tasks/transform.py b/src/tasks/transform.py index 72611e5..7a8105d 100644 --- a/src/tasks/transform.py +++ b/src/tasks/transform.py @@ -3,39 +3,10 @@ import polars as pl import polars.selectors as cs -from prefect import task -from src.config import DATA_DIR, DIST_DIR, SIRENE_UNITES_LEGALES_URL, DecpFormat +from src.config import DATA_DIR, DIST_DIR, DecpFormat from src.tasks.output import save_to_files - - -def process_string_lists(lf: pl.LazyFrame): - string_lists_col_to_rename = [ - "considerationsSociales_considerationSociale", - "considerationsEnvironnementales_considerationEnvironnementale", - "techniques_technique", - "typesPrix_typePrix", - "modalitesExecution_modaliteExecution", - ] - columns = lf.collect_schema().names() - - # Pour s'assurer qu'on renomme pas une colonne si le bon nom de colonne existe déjà - for bad_col in string_lists_col_to_rename: - new_col = bad_col.split("_")[0] - if new_col in columns and bad_col in columns: - lf = lf.with_columns( - pl.when(pl.col(new_col).len() == 0) - .then(pl.col(bad_col)) - .otherwise(pl.col(new_col)) - ) - lf = lf.drop(bad_col) - elif new_col not in columns and bad_col in columns: - lf = lf.rename({bad_col: new_col}) - - # Et on remplace la liste Python par une liste séparée par des virgules - lf = lf.with_columns(cs.by_dtype(pl.List(pl.String)).list.join(", ").name.keep()) - - return lf +from src.tasks.utils import check_parquet_file def explode_titulaires(lf: pl.LazyFrame, decp_format: DecpFormat): @@ -56,36 +27,7 @@ def explode_titulaires(lf: pl.LazyFrame, decp_format: DecpFormat): return lf -def remove_modifications_duplicates(df): - """On supprime les marches avec un suffixe correspondant à un autre marché""" - if "modifications" not in df.collect_schema().names(): - return df - # Index sans les suffixes - df_cleaned = remove_suffixes_from_uid_column(df) - df_cleaned = df_cleaned.with_columns( - modifications_len=pl.col("modifications").list.len(), - ) - - df_cleaned = df_cleaned.sort("modifications_len").unique("uid", keep="last") - return df_cleaned - - -def remove_suffixes_from_uid_column(df): - """Supprimer les suffixes des uid quand ce suffixe correspond au nombre de mofifications apportées au marché. - Exemple : uid = [acheteur_id]12302 et le marché a deux modifications. uid => [acheteur_id]123. - """ - df = df.with_columns( - expected_suffix=pl.col("modifications").list.len().cast(pl.Utf8).str.zfill(2) - ) - df = df.with_columns( - uid=pl.when(pl.col("uid").str.ends_with(pl.col("expected_suffix"))) - .then(pl.col("uid").str.head(-2)) - .otherwise(pl.col("uid")) - ) - return df - - -def replace_with_modification_data(lf: pl.LazyFrame): +def replace_with_modification_data(lff: pl.LazyFrame): """ Gère les modifications dans le DataFrame des DECP. À ce stade les modifications ont été exploded dans write_marche_rows(). @@ -94,16 +36,16 @@ def replace_with_modification_data(lf: pl.LazyFrame): Elle ajoute également la colonne "donneesActuelles" pour indiquer si la modification est la plus récente. """ # Étape 1: Extraire les données des modifications en renommant les colonnes - schema = lf.collect_schema().names() + schema = lff.collect_schema().names() lf_mods = ( - lf.select(cs.by_name("uid") | cs.starts_with("modification_")) + lff.select(cs.by_name("uid") | cs.starts_with("modification_")) .rename( { column: column.removeprefix("modification_").removesuffix( "Modification" ) for column in schema - if column.startswith("modification_") + if column.startswith("modification_") and column != "modification_id" } ) .filter(~pl.all_horizontal(pl.all().exclude("uid").is_null())) @@ -112,10 +54,10 @@ def replace_with_modification_data(lf: pl.LazyFrame): # Étape 2: Dédupliquer et créer une copie du DataFrame initial sans les colonnes "modifications" # On peut dédupliquer aveuglément car la seule chose qui varient dans les lignes d'un même # uid, c'est les données de modifs - lf = lf.unique("uid") + lff = lff.unique("uid") # Garder TOUTES les colonnes sauf les colonnes modification_* - lf_base = lf.drop(cs.starts_with("modification_")) + lf_base = lff.drop(cs.starts_with("modification_")) # Étape 3: Ajouter le modification_id et la colonne données actuelles pour chaque modif # Colonnes qui peuvent changer avec les modifications @@ -128,35 +70,36 @@ def replace_with_modification_data(lf: pl.LazyFrame): "titulaires", ] - lf = ( - pl.concat( - [ - lf_base.select(modification_columns), - lf_mods, - ], - how="vertical_relaxed", - ) - .with_columns( - pl.col("dateNotification") - .rank(method="ordinal") - .over("uid") - .cast(pl.Int64) - .sub(1) - .alias("modification_id") - ) - .with_columns( - ( - pl.col("modification_id") == pl.col("modification_id").max().over("uid") - ).alias("donneesActuelles") - ) - .sort( - ["uid", "dateNotification", "modification_id"], - descending=[False, True, True], - ) + lff = pl.concat( + [ + lf_base.select(modification_columns), + lf_mods, + ], + how="diagonal", + ) + + lff = lff.with_columns( + pl.col("dateNotification") + .rank(method="ordinal") + .over("uid") + .cast(pl.Int16) + .sub(1) + .alias("modification_id") + ) + + lff = lff.with_columns( + ( + pl.col("modification_id") == pl.col("modification_id").max().over("uid") + ).alias("donneesActuelles") + ) + + lff = lff.sort( + ["uid", "dateNotification", "modification_id"], + descending=[False, True, True], ) # Étape 4: Remplir les valeurs nulles en utilisant les dernières valeurs non-nulles pour chaque id - lf = lf.with_columns( + lff = lff.with_columns( pl.col("montant", "dureeMois", "titulaires") .fill_null(strategy="backward") .over("uid") @@ -174,7 +117,7 @@ def replace_with_modification_data(lf: pl.LazyFrame): lf_fixed_columns = lf_base.select(["uid"] + columns_to_keep).unique("uid") # Joindre pour réintroduire les colonnes fixes - lf_final = lf.join( + lf_final = lff.join( lf_fixed_columns, on="uid", how="left", @@ -198,34 +141,57 @@ def process_modifications(lf: pl.LazyFrame) -> pl.LazyFrame: return lf -def concat_decp_json(dfs: list) -> pl.DataFrame: - df = pl.concat(dfs, how="diagonal_relaxed") +def concat_parquet_files(parquet_files: list) -> pl.LazyFrame: + # Concatenation par morceaux (chunks) pour éviter de charger trop de fichiers en mémoire + # et pour éviter "OSError: Too many open files" - del dfs + # Mise de côté des parquet + # - qui n'existent pas (s'il y a eu une erreur par exemple) + # - qui ont une hauteur de 0 + checked_parquet_files = [file for file in parquet_files if check_parquet_file(file)] + + chunk_size = 500 + chunks = [ + checked_parquet_files[i : i + chunk_size] + for i in range(0, len(checked_parquet_files), chunk_size) + ] + + intermediate_files = [] + for i, chunk in enumerate(chunks): + print(f"Concatenation du chunk {i + 1}/{len(chunks)}") + lfs = [pl.scan_parquet(file) for file in chunk] + lf_chunk = pl.concat(lfs, how="vertical") + + # On sauvegarde chaque chunk concaténé + chunk_path = DATA_DIR / "temp" / f"chunk_{i}.parquet" + chunk_path.parent.mkdir(parents=True, exist_ok=True) + + # Utilisation de sink_parquet pour écrire sans tout charger en RAM + lf_chunk.sink_parquet(chunk_path, engine="streaming") + intermediate_files.append(chunk_path) + + # Concatenation finale des fichiers intermédiaires + print("Concatenation finale...") + lfs = [pl.scan_parquet(file) for file in intermediate_files if Path(file).exists()] + lf_concat: pl.LazyFrame = pl.concat(lfs, how="vertical") print( "Suppression des lignes en doublon par UID + titulaire ID + titulaire type ID + modification_id" ) # Exemple de doublon : 20005584600014157140791205100 - index_size_before = df.height - df_clean = df.unique( + lf_concat = lf_concat.unique( subset=["uid", "titulaire_id", "titulaire_typeIdentifiant", "modification_id"], maintain_order=False, ) - del df - - print("-- ", index_size_before - df_clean.height, " doublons supprimés") - - return df_clean + return lf_concat def extract_unique_acheteurs_siret(lf: pl.LazyFrame): # Extraction des SIRET des DECP dans une copie du df de base lf = lf.select("acheteur_id") lf = lf.unique() - lf = lf.sort(by="acheteur_id") return lf @@ -235,25 +201,73 @@ def extract_unique_titulaires_siret(lf: pl.LazyFrame): lf = lf.unique().filter( pl.col("titulaire_id") != "", pl.col("titulaire_typeIdentifiant") == "SIRET" ) - lf = lf.sort(by="titulaire_id") return lf -@task -def get_prepare_unites_legales(processed_parquet_path): - print("Téléchargement des données unité légales et sélection des colonnes...") - ( - pl.scan_parquet(SIRENE_UNITES_LEGALES_URL) - .select(["siren", "denominationUniteLegale"]) - .filter(pl.col("siren").is_not_null()) - .filter(pl.col("denominationUniteLegale").is_not_null()) - .unique() - .sink_parquet(processed_parquet_path) +def prepare_unites_legales(lf: pl.LazyFrame) -> pl.LazyFrame: + return ( + lf.select( + [ + "siren", + "denominationUniteLegale", + "prenomUsuelUniteLegale", + "nomUniteLegale", # toujours rempli pour personnes physique + "nomUsageUniteLegale", # parfois rempli, a la priorité sur nomUniteLegale + "statutDiffusionUniteLegale", # P = non-diffusible + ] + ) + .filter( + pl.col("siren").is_not_null() + ) # utilisation du fichier Stock, normalement pas de siren null + .unique() # utilisation du fichier Stock, normalement pas de doublons + .with_columns( + pl.when(pl.col("nomUsageUniteLegale").is_not_null()) + .then(pl.col("nomUsageUniteLegale")) + .otherwise(pl.col("nomUniteLegale")) + .alias("nomUniteLegale") + ) + .with_columns( + pl.when(pl.col("nomUniteLegale").is_not_null()) + .then( + pl.concat_str( + pl.col("prenomUsuelUniteLegale"), + pl.col("nomUniteLegale"), + separator=" ", + ) + ) + .otherwise(pl.col("denominationUniteLegale")) + .alias("denominationUniteLegale") + ) + .with_columns( + pl.when(pl.col("statutDiffusionUniteLegale") == "P") + .then(pl.lit("[Données personnelles non-diffusibles]")) + .otherwise(pl.col("denominationUniteLegale")) + .alias("denominationUniteLegale") + ) + .drop( + [ + "prenomUsuelUniteLegale", + "statutDiffusionUniteLegale", + "nomUniteLegale", + "nomUsageUniteLegale", + ] + ) ) -def prepare_etablissements(lf: pl.LazyFrame, processed_parquet_path: Path) -> None: - lf = lf.rename( +def prepare_etablissements(lff: pl.LazyFrame) -> pl.LazyFrame: + lff = lff.with_columns( + [ + pl.col("codeCommuneEtablissement").str.pad_start(5, "0"), + pl.col("siret").str.pad_start(14, "0"), + # Si enseigne1Etablissement est null, on utilise denominationUsuelleEtablissement + pl.coalesce( + "enseigne1Etablissement", "denominationUsuelleEtablissement" + ).alias("etablissement_nom"), + ] + ) + lff = lff.drop("denominationUsuelleEtablissement", "enseigne1Etablissement") + lff = lff.rename( { "codeCommuneEtablissement": "commune_code", "activitePrincipaleEtablissement": "activite_code", @@ -261,28 +275,32 @@ def prepare_etablissements(lf: pl.LazyFrame, processed_parquet_path: Path) -> No } ) - # Ajout des noms de départements, noms régions, + # Ajout des noms de commune, départements, régions lf_cog = pl.scan_parquet(DATA_DIR / "code_officiel_geographique.parquet") - lf = lf.join(lf_cog, on="commune_code", how="left") + lff = lff.join(lf_cog, on="commune_code", how="left") - lf.sink_parquet(processed_parquet_path) + return lff -def sort_columns(df: pl.DataFrame, config_columns): +def sort_columns(lf: pl.LazyFrame, config_columns): # Les colonnes présentes mais absentes des colonnes attendues sont mises à la fin de la liste + schema = lf.collect_schema() other_columns = [] - for col in df.columns: + for col in schema.keys(): if col not in config_columns: other_columns.append(col) print("Colonnes inattendues:", other_columns) - return df.select(config_columns + other_columns) + lf = lf.select(config_columns + other_columns) + lf = lf.sort( + by=["dateNotification", "uid"], descending=[True, False], nulls_last=True + ) + return lf -def calculate_naf_cpv_matching(df: pl.DataFrame): - lf_naf_cpv = df.lazy() +def calculate_naf_cpv_matching(lf_naf_cpv: pl.LazyFrame): # Unité de base pour le comptage : dernière version d'un marché attribué (donc pas forcément attributaire initial) lf_naf_cpv = ( lf_naf_cpv.select( @@ -328,7 +346,7 @@ def calculate_naf_cpv_matching(df: pl.DataFrame): ) # Pas de pivot en Lazy, donc on repasse en DataFrame - df_occurences_cpv = lf_naf_cpv.collect() + df_occurences_cpv = lf_naf_cpv.collect(engine="streaming") df_occurences_cpv = df_occurences_cpv.pivot( index="activite", on="cpv_code", values="compte", aggregate_function=None ) @@ -406,12 +424,21 @@ def add_duree_restante(lff: pl.LazyFrame): end_date = pl.col("dateNotification") + pl.duration(days=duree_mois_days_int) duree_restante_mois = ((end_date - today).dt.total_days() / 30).round(1) + # Pas de valeurs négatives. lff = lff.with_columns( pl.when(duree_restante_mois < 0) .then(pl.lit(0)) .otherwise(duree_restante_mois) .alias("dureeRestanteMois") ) + + # Si dureeRestanteMois > dureeMois, dureeRestanteMois = dureeMois + lff = lff.with_columns( + pl.when(pl.col("dureeRestanteMois") > pl.col("dureeMois")) + .then(pl.col("dureeMois").cast(pl.Float32)) + .otherwise(pl.col("dureeRestanteMois")) + .alias("dureeRestanteMois") + ) return lff @@ -420,81 +447,80 @@ def add_duree_restante(lff: pl.LazyFrame): # -def make_acheteur_nom(decp_acheteurs_df: pl.LazyFrame): - # Construction du champ acheteur_id - - from numpy import nan as NaN - - def construct_nom(row): - if row["enseigne1Etablissement"] is NaN: - return row["denominationUniteLegale"] - else: - return f"{row['denominationUniteLegale']} - {row['enseigne1Etablissement']}" - - decp_acheteurs_df["acheteur_id"] = decp_acheteurs_df.apply(construct_nom, axis=1) - - # TODO: ne garder que les colonnes acheteur_id et acheteur_id - - return decp_acheteurs_df - - -def improve_titulaire_unite_legale_data(df_sirets_titulaires: pl.DataFrame): - # Raccourcissement du code commune - df_sirets_titulaires["departement"] = df_sirets_titulaires[ - "codeCommuneEtablissement" - ].str[:2] - df_sirets_titulaires = df_sirets_titulaires.drop( - columns=["codeCommuneEtablissement"] - ) - - # # Raccourcissement de l'activité principale - # pas sûr de pourquoi je voulais raccourcir le code NAF/APE. Pour récupérérer des libellés ? - # decp_titulaires_sirets_df['activitePrincipaleEtablissement'] = decp_titulaires_sirets_df['activitePrincipaleEtablissement'].str[:-3] - - # Correction des données ESS et état - df_sirets_titulaires["etatAdministratifUniteLegale"] = df_sirets_titulaires[ - "etatAdministratifUniteLegale" - ].cat.rename_categories({"A": "Active", "C": "Cessée"}) - df_sirets_titulaires["economieSocialeSolidaireUniteLegale"] = df_sirets_titulaires[ - "economieSocialeSolidaireUniteLegale" - ].replace({"O": "Oui", "N": "Non"}) - - df_sirets_titulaires = improve_categories_juridiques(df_sirets_titulaires) - - return df_sirets_titulaires - - -def improve_categories_juridiques(df_sirets_titulaires: pl.DataFrame): - # Récupération et raccourcissement des categories juridiques du fichier SIREN - df_sirets_titulaires["categorieJuridiqueUniteLegale"] = ( - df_sirets_titulaires["categorieJuridiqueUniteLegale"].astype(str).str[:2] - ) - - # Récupération des libellés des catégories juridiques - cj_df = pl.read_csv(DATA_DIR / "cj.csv", index_col=None, dtype="object") - df_sirets_titulaires = pl.merge( - df_sirets_titulaires, - cj_df, - how="left", - left_on="categorieJuridiqueUniteLegale", - right_on="Code", - ) - df_sirets_titulaires["categorieJuridique"] = df_sirets_titulaires["Libellé"] - df_sirets_titulaires = df_sirets_titulaires.drop( - columns=["Code", "categorieJuridiqueUniteLegale", "Libellé"] - ) - return df_sirets_titulaires - - -def rename_titulaire_sirene_columns(df_sirets_titulaires: pl.DataFrame): - # Renommage des colonnes - - renaming = { - "activitePrincipaleEtablissement": "codeAPE", - "etatAdministratifUniteLegale": "etatEntreprise", - "etatAdministratifEtablissement": "etatEtablissement", - } - - df_sirets_titulaires = df_sirets_titulaires.rename(columns=renaming) - - return df_sirets_titulaires +# def make_acheteur_nom(decp_acheteurs_df: pl.LazyFrame): +# # Construction du champ acheteur_id +# +# from numpy import nan as NaN +# +# def construct_nom(row): +# if row["enseigne1Etablissement"] is NaN: +# return row["denominationUniteLegale"] +# else: +# return f"{row['denominationUniteLegale']} - {row['enseigne1Etablissement']}" +# +# decp_acheteurs_df["acheteur_id"] = decp_acheteurs_df.apply(construct_nom, axis=1) +# +# +# return decp_acheteurs_df +# +# +# def improve_titulaire_unite_legale_data(df_sirets_titulaires: pl.DataFrame): +# # Raccourcissement du code commune +# df_sirets_titulaires["departement"] = df_sirets_titulaires[ +# "codeCommuneEtablissement" +# ].str[:2] +# df_sirets_titulaires = df_sirets_titulaires.drop( +# columns=["codeCommuneEtablissement"] +# ) +# +# # # Raccourcissement de l'activité principale +# # pas sûr de pourquoi je voulais raccourcir le code NAF/APE. Pour récupérérer des libellés ? +# # decp_titulaires_sirets_df['activitePrincipaleEtablissement'] = decp_titulaires_sirets_df['activitePrincipaleEtablissement'].str[:-3] +# +# # Correction des données ESS et état +# df_sirets_titulaires["etatAdministratifUniteLegale"] = df_sirets_titulaires[ +# "etatAdministratifUniteLegale" +# ].cat.rename_categories({"A": "Active", "C": "Cessée"}) +# df_sirets_titulaires["economieSocialeSolidaireUniteLegale"] = df_sirets_titulaires[ +# "economieSocialeSolidaireUniteLegale" +# ].replace({"O": "Oui", "N": "Non"}) +# +# df_sirets_titulaires = improve_categories_juridiques(df_sirets_titulaires) +# +# return df_sirets_titulaires +# +# +# def improve_categories_juridiques(df_sirets_titulaires: pl.DataFrame): +# # Récupération et raccourcissement des categories juridiques du fichier SIREN +# df_sirets_titulaires["categorieJuridiqueUniteLegale"] = ( +# df_sirets_titulaires["categorieJuridiqueUniteLegale"].astype(str).str[:2] +# ) +# +# # Récupération des libellés des catégories juridiques +# cj_df = pl.read_csv(DATA_DIR / "cj.csv", index_col=None, dtype="object") +# df_sirets_titulaires = pl.merge( +# df_sirets_titulaires, +# cj_df, +# how="left", +# left_on="categorieJuridiqueUniteLegale", +# right_on="Code", +# ) +# df_sirets_titulaires["categorieJuridique"] = df_sirets_titulaires["Libellé"] +# df_sirets_titulaires = df_sirets_titulaires.drop( +# columns=["Code", "categorieJuridiqueUniteLegale", "Libellé"] +# ) +# return df_sirets_titulaires +# +# +# def rename_titulaire_sirene_columns(df_sirets_titulaires: pl.DataFrame): +# # Renommage des colonnes +# +# renaming = { +# "activitePrincipaleEtablissement": "codeAPE", +# "etatAdministratifUniteLegale": "etatEntreprise", +# "etatAdministratifEtablissement": "etatEtablissement", +# } +# +# df_sirets_titulaires = df_sirets_titulaires.rename(columns=renaming) +# +# return df_sirets_titulaires diff --git a/src/tasks/utils.py b/src/tasks/utils.py index c16f3fc..9757c20 100644 --- a/src/tasks/utils.py +++ b/src/tasks/utils.py @@ -12,7 +12,7 @@ CACHE_EXPIRATION_TIME_HOURS, DATE_NOW, DIST_DIR, - PREFECT_LOCAL_STORAGE_PATH, + RESOURCE_CACHE_DIR, SIRENE_DATA_DIR, TRACKED_DATASETS, DecpFormat, @@ -69,17 +69,20 @@ def get_clean_cache_key(context, parameters) -> str: @task def remove_unused_cache( - cache_dir: Path = PREFECT_LOCAL_STORAGE_PATH, + cache_dir: Path = RESOURCE_CACHE_DIR, cache_expiration_time_hours: int = CACHE_EXPIRATION_TIME_HOURS, ): now = time.time() age_limit = cache_expiration_time_hours * 3600 # seconds + deleted_files = [] if cache_dir.exists(): for file in cache_dir.rglob("*"): if file.is_file(): if now - file.stat().st_atime > age_limit: - print(f"Deleting cache file: {file}") + print(f"Suppression du fichier de cache: {file}") + deleted_files.append(file) file.unlink() + print(f"{len(deleted_files)} fichiers supprimés") # @@ -119,10 +122,10 @@ def gen_artifact_row( # Statistiques pour toutes les données collectées ce jour -def generate_stats(df: pl.DataFrame): +def generate_stats(lf: pl.LazyFrame): now = datetime.now() - df_uid: pl.DataFrame = ( - df.select( + lf_uid = ( + lf.select( "uid", "acheteur_id", "dateNotification", @@ -137,46 +140,108 @@ def generate_stats(df: pl.DataFrame): ) # Statistiques sur les sources de données (statistiques.csv) - generate_public_source_stats(df_uid) + # generate_public_source_stats does aggregations. Let's make it lazy too. + generate_public_source_stats(lf_uid) - resources = df_uid["sourceFile"].unique().to_list() + # Collect only the necessary aggregates for the main stats + # We need to compute several things. It might be efficient to do one big aggregation or several small collects. + + # 1. Resources and Sources + resources = ( + lf_uid.select("sourceFile") + .unique() + .collect() + .get_column("sourceFile") + .to_list() + ) + sources = ( + lf_uid.select("sourceDataset") + .unique() + .collect() + .get_column("sourceDataset") + .to_list() + ) + + # 2. Counts + nb_lignes = lf.select(pl.len()).collect().item() + nb_marches = lf_uid.select(pl.len()).collect().item() + + # 3. Unique counts (approximate or exact) + nb_acheteurs_uniques = ( + lf_uid.select("acheteur_id").unique().select(pl.len()).collect().item() - 1 + ) + nb_titulaires_uniques = ( + lf.select("titulaire_id", "titulaire_typeIdentifiant") + .unique() + .select(pl.len()) + .collect() + .item() + - 1 + ) + + # 4. Columns + columns = lf.collect_schema().names() stats = { "datetime": now.isoformat()[:-7], # jusqu'aux secondes "date": DATE_NOW, "resources": resources, "nb_resources": len(resources), - "sources": df_uid["sourceDataset"].unique().to_list(), - "nb_lignes": df.height, - "colonnes_triées": sorted(df.columns), - "nb_colonnes": len(df.columns), - "nb_marches": df_uid.height, - "nb_acheteurs_uniques": df_uid.select("acheteur_id").unique().height - - 1, # -1 pour ne pas compter la valeur "acheteur vide" - "nb_titulaires_uniques": df.select("titulaire_id", "titulaire_typeIdentifiant") - .unique() - .height - - 1, # -1 pour ne pas compter la valeur "titulaire vide" + "sources": sources, + "nb_lignes": nb_lignes, + "colonnes_triées": sorted(columns), + "nb_colonnes": len(columns), + "nb_marches": nb_marches, + "nb_acheteurs_uniques": nb_acheteurs_uniques, + "nb_titulaires_uniques": nb_titulaires_uniques, } - for year in range(2018, int(DATE_NOW[0:4]) + 1): - stats[f"{str(year)}_nb_publications_marchés"] = df_uid.filter( - pl.col("datePublicationDonnees").dt.year() == year - ).height + # 5. Yearly stats + # We can do this with a group_by and then iterate over the result - df_date_notification = df_uid.filter( - pl.col("dateNotification").dt.year() == year + # Aggregations for publications per year + pub_stats = ( + lf_uid.with_columns(pl.col("datePublicationDonnees").dt.year().alias("year")) + .group_by("year") + .agg(pl.len().alias("count")) + .collect() + ) + + # Aggregations for notifications per year (count, sum, median) + notif_stats = ( + lf_uid.with_columns(pl.col("dateNotification").dt.year().alias("year")) + .group_by("year") + .agg( + pl.len().alias("count"), + pl.sum("montant").alias("sum_montant"), + pl.median("montant").alias("median_montant"), ) - stats[f"{str(year)}_nb_notifications_marchés"] = df_date_notification.height + .collect() + ) - if df_date_notification.height > 0: + for year in range(2018, int(DATE_NOW[0:4]) + 1): + # Publications + pub_count = ( + pub_stats.filter(pl.col("year") == year).select("count").item() + if not pub_stats.filter(pl.col("year") == year).is_empty() + else 0 + ) + stats[f"{str(year)}_nb_publications_marchés"] = pub_count + + # Notifications + year_stats = notif_stats.filter(pl.col("year") == year) + if not year_stats.is_empty(): + stats[f"{str(year)}_nb_notifications_marchés"] = year_stats.select( + "count" + ).item() stats[f"{str(year)}_somme_montant_marchés_notifiés"] = int( - df_date_notification.select(pl.sum("montant")).item() + year_stats.select("sum_montant").item() or 0 ) stats[f"{str(year)}_médiane_montant_marchés_notifiés"] = int( - df_date_notification.select(pl.median("montant")).item() + year_stats.select("median_montant").item() or 0 ) else: + stats[f"{str(year)}_nb_notifications_marchés"] = 0 stats[f"{str(year)}_somme_montant_marchés_notifiés"] = "" stats[f"{str(year)}_médiane_montant_marchés_notifiés"] = "" @@ -188,23 +253,31 @@ def generate_stats(df: pl.DataFrame): ) -def generate_public_source_stats(df_uid: pl.DataFrame) -> None: +def generate_public_source_stats(lf_uid: pl.LazyFrame) -> None: print("Génération des statistiques sur les sources de données...") - df_uid = df_uid.select("uid", "acheteur_id", "sourceDataset") + lf_uid = lf_uid.select("uid", "acheteur_id", "sourceDataset") + # We need to collect these intermediate aggregations to join them with the sources dataframe (which is small) df_acheteurs = ( - df_uid.select("acheteur_id", "sourceDataset") + lf_uid.select("acheteur_id", "sourceDataset") .unique() .group_by("sourceDataset") .len() + .collect() ) df_acheteurs = df_acheteurs.rename({"len": "nb_acheteurs"}) # group + count - df_uid = ( - df_uid.select("uid", "sourceDataset").unique().group_by("sourceDataset").len() + df_uid_agg = ( + lf_uid.select("uid", "sourceDataset") + .unique() + .group_by("sourceDataset") + .len() + .collect() + ) + df_uid_agg = df_uid_agg.rename({"len": "nb_marchés"}).sort( + by="nb_marchés", descending=True ) - df_uid = df_uid.rename({"len": "nb_marchés"}).sort(by="nb_marchés", descending=True) # lecture des sources en df df_sources: pl.DataFrame = pl.DataFrame(TRACKED_DATASETS) @@ -229,7 +302,7 @@ def generate_public_source_stats(df_uid: pl.DataFrame) -> None: coalesce=True, ) df_sources = df_sources.join( - df_uid, left_on="code", right_on="sourceDataset", how="full", coalesce=True + df_uid_agg, left_on="code", right_on="sourceDataset", how="full", coalesce=True ) # ordre des colonnes @@ -252,3 +325,19 @@ def generate_public_source_stats(df_uid: pl.DataFrame) -> None: # dump CSV dans dist df_sources.write_csv(DIST_DIR / "statistiques.csv") + + +def full_resource_name(r: dict): + """Retourne le nom du fichier de la ressource et le nom du dataset.""" + return f"{r['ori_filename']} ({r['dataset_name']})" + + +def check_parquet_file(path) -> bool: + try: + lf = pl.scan_parquet(path) + height = lf.select(pl.count()).collect().item() + result = height > 0 + del lf + return result + except (FileNotFoundError, pl.exceptions.ComputeError): + return False diff --git a/template.env b/template.env index 6bfb992..1efb815 100644 --- a/template.env +++ b/template.env @@ -57,8 +57,8 @@ MAX_PREFECT_WORKERS="1" # La base de données doit être créée auparavant POSTGRESQL_DB_URI="postgresql://user:pass@server:port/database" -# Pour ignorer le cache Prefect mettre "true" -PREFECT_TASKS_REFRESH_CACHE="false" +# Pour ignorer le cache Prefect mettre "false" +DECP_USE_CACHE="true" # Timeout pour la publication de chaque ressource sur data.gouv.fr. Défaut : 300 (5 minutes) # DECP_PROCESSING_PUBLISH_TIMEOUT= diff --git a/tests/data/decp_test_2019.json b/tests/data/decp_test_2019.json index bed2b10..a415e6a 100644 --- a/tests/data/decp_test_2019.json +++ b/tests/data/decp_test_2019.json @@ -14,7 +14,7 @@ { "typeIdentifiant": "SIRET", "denominationSociale": "AMC FOLLIOT", - "id": "65265021900023" + "id": "12345678900022" } ] ] @@ -28,7 +28,7 @@ { "typeIdentifiant": "SIRET", "denominationSociale": "AMC FOLLIOT", - "id": "65265021900023" + "id": "12345678900022" } ], "id": "2019_83935401", @@ -68,7 +68,7 @@ "titulaires": [ { "typeIdentifiant": "SIRET", - "id": "34027049500021", + "id": "12345678900023", "denominationSociale": "FFF" }, { diff --git a/tests/data/decp_test_2022.json b/tests/data/decp_test_2022.json index 7d31bcb..f5073c9 100644 --- a/tests/data/decp_test_2022.json +++ b/tests/data/decp_test_2022.json @@ -30,7 +30,7 @@ { "titulaire": { "typeIdentifiant": "SIRET", - "id": "34027049500021" + "id": "12345678900023" } }, { @@ -184,7 +184,7 @@ { "titulaire": { "typeIdentifiant": "SIRET", - "id": "58211867500054" + "id": "12345678900022" } }, { diff --git a/tests/data/sirene/etablissements.parquet b/tests/data/sirene/etablissements.parquet new file mode 100644 index 0000000..f9c8531 Binary files /dev/null and b/tests/data/sirene/etablissements.parquet differ diff --git a/tests/data/sirene/unites_legales.parquet b/tests/data/sirene/unites_legales.parquet new file mode 100644 index 0000000..a1ed29f Binary files /dev/null and b/tests/data/sirene/unites_legales.parquet differ diff --git a/tests/test_clean.py b/tests/test_clean.py new file mode 100644 index 0000000..80cd685 --- /dev/null +++ b/tests/test_clean.py @@ -0,0 +1,292 @@ +import datetime + +import polars as pl + +from src.config import DecpFormat +from src.schemas import SCHEMA_MARCHE_2019, SCHEMA_MARCHE_2022 +from src.tasks.clean import ( + clean_decp, + clean_invalid_characters, + clean_null_equivalent, + clean_titulaires, + extract_innermost_struct, + fix_data_types, +) + + +def test_extract_innermost_struct(): + # Test case 1: Simple dictionary - returns None as it expects list + data = {"a": 1} + assert extract_innermost_struct(data) is None + + # Test case 2: Nested list with dictionary - returns the list containing the dict + data = [[{"a": 1}]] + assert extract_innermost_struct(data) == [{"a": 1}] + + # Test case 3: Deeply nested list + data = [[[[{"a": 1}]]]] + assert extract_innermost_struct(data) == [{"a": 1}] + + # Test case 4: Empty list + data = [] + assert extract_innermost_struct(data) == [] + + # Test case 5: List with empty list + data = [[]] + assert extract_innermost_struct(data) == [] + + # Test case 6: Fallback (not a list) + data = "string" + assert extract_innermost_struct(data) is None + + +def test_clean_invalid_characters(): + # Test case 1: String with no invalid characters + data = b"Hello World" + assert clean_invalid_characters(data) == data + + # Test case 2: String with invalid characters (e.g., null byte) + data = b"Hello\x00World" + assert clean_invalid_characters(data) == b"HelloWorld" + + # Test case 3: String with other control characters + data = b"Hello\x01World" + assert clean_invalid_characters(data) == b"HelloWorld" + + +def test_clean_null_equivalent(): + # Setup data + data = { + "considerationsSociales": ["Pas de considération sociale", "Oui", "Non"], + "considerationsEnvironnementales": [ + "Pas de considération environnementale", + "Oui", + "Non", + ], + "ccag": ["Pas de CCAG", "CCAG Travaux", "Autre"], + "typeGroupement": ["Pas de groupement", "Conjoint", "Solidaire"], + "other_col": ["A", "B", "C"], + } + lf = pl.LazyFrame(data) + + # Execute function + result = clean_null_equivalent(lf).collect() + + # Assertions + assert result["considerationsSociales"][0] == "Sans objet" + assert result["considerationsSociales"][1] == "Oui" + assert result["considerationsEnvironnementales"][0] == "Sans objet" + assert result["ccag"][0] == "Sans objet" + assert result["typeGroupement"][0] == "Sans objet" + assert result["other_col"][0] == "A" + + +def test_clean_titulaires(): + decp_format_2019 = DecpFormat("DECP 2019", SCHEMA_MARCHE_2019, "marches") + decp_format_2022 = DecpFormat("DECP 2022", SCHEMA_MARCHE_2022, "marches.marche") + + # Test DECP 2019 format + data_2019 = { + "titulaires": [ + [ + {"id": "1", "typeIdentifiant": "SIRET"}, + {"id": "2", "typeIdentifiant": "SIRET"}, + ], + [{"id": None, "typeIdentifiant": None}], # Should be filtered out + [], # Empty list + ], + "modification_titulaires": [[], [], []], + } + schema_2019 = { + "titulaires": pl.List(pl.Struct({"id": pl.Utf8, "typeIdentifiant": pl.Utf8})), + "modification_titulaires": pl.List( + pl.Struct({"id": pl.Utf8, "typeIdentifiant": pl.Utf8}) + ), + } + lf_2019 = pl.LazyFrame(data_2019, schema=schema_2019) + result_2019 = clean_titulaires(lf_2019, decp_format_2019).collect() + + titulaires_0 = result_2019["titulaires"][0] + assert len(titulaires_0) == 2 + assert titulaires_0[0]["titulaire_id"] == "1" + assert titulaires_0[0]["titulaire_typeIdentifiant"] == "SIRET" + + # Check that empty/null titulaires are handled (filtered out or result is null if empty list) + # The code filters elements where id AND typeIdentifiant are null. + # And then replaces empty lists with None. + + # Row 1: [{"id": None, "typeIdentifiant": None}] -> [] -> None + assert result_2019["titulaires"][1] is None + + # Row 2: [] -> [] -> None + assert result_2019["titulaires"][2] is None + + # Test DECP 2022 format + data_2022 = { + "titulaires": [ + [{"titulaire": {"id": "3", "typeIdentifiant": "SIRET"}}], + ], + "modification_titulaires": [[]], + } + schema_2022 = { + "titulaires": pl.List( + pl.Struct( + {"titulaire": pl.Struct({"id": pl.Utf8, "typeIdentifiant": pl.Utf8})} + ) + ), + "modification_titulaires": pl.List( + pl.Struct( + {"titulaire": pl.Struct({"id": pl.Utf8, "typeIdentifiant": pl.Utf8})} + ) + ), + } + lf_2022 = pl.LazyFrame(data_2022, schema=schema_2022) + result_2022 = clean_titulaires(lf_2022, decp_format_2022).collect() + + titulaires_0 = result_2022["titulaires"][0] + assert len(titulaires_0) == 1 + assert titulaires_0[0]["titulaire_id"] == "3" + + +def test_fix_data_types(): + data = { + "dureeMois": ["12", "invalid", None, None, None], + "montant": ["100.50", "invalid", "200", "300", "400"], + "dateNotification": [ + "2023-01-01", + "invalid-date", + "2023-12-31", + "2023-01-02", + "2023-01-03", + ], + "sousTraitanceDeclaree": ["true", "false", "Oui", "Non", "invalid"], + "marcheInnovant": ["1", "0", "invalid", "0", "1"], + "attributionAvance": ["true", "false", "invalid", "false", "true"], + "offresRecues": ["1", "2", "3", "4", "5"], + "tauxAvance": ["0", "0", "0", "0", "0"], + "origineFrance": ["0", "0", "0", "0", "0"], + "origineUE": ["0", "0", "0", "0", "0"], + "modification_id": ["0", "0", "0", "0", "0"], + "datePublicationDonnees": [ + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + ], + } + lf = pl.LazyFrame(data) + result = fix_data_types(lf).collect() + + # Check numeric types + assert result["dureeMois"].dtype == pl.Int16 + assert result["dureeMois"][0] == 12 + assert result["dureeMois"][1] is None + + assert result["montant"].dtype == pl.Float64 + assert result["montant"][0] == 100.50 + assert result["montant"][1] is None + + # Check dates + assert result["dateNotification"].dtype == pl.Date + assert result["dateNotification"][0] == datetime.date(2023, 1, 1) + assert result["dateNotification"][1] is None + + # Check booleans + assert result["sousTraitanceDeclaree"].dtype == pl.Boolean + assert result["sousTraitanceDeclaree"][0] is True + assert result["sousTraitanceDeclaree"][1] is False + assert result["sousTraitanceDeclaree"][2] is True + assert result["sousTraitanceDeclaree"][3] is False + assert result["sousTraitanceDeclaree"][4] is None + + assert result["marcheInnovant"].dtype == pl.Boolean + assert result["marcheInnovant"][0] is True + assert result["marcheInnovant"][1] is False + + +def test_clean_decp(): + # Minimal test for clean_decp to ensure the pipeline runs + + # clean_decp expects certain columns + data = { + "id": ["id.1", "id/2", ""], + "acheteur_id": ["ach1", "ach2", ""], + "acheteur.id": ["", "ach2", ""], + "montant": ["1000", "1000000000000.00", "2000"], + "datePublicationDonnees": ["2023-01-01", "0002-11-30", "2023-01-02"], + "dateNotification": ["2023-01-01", "2023-01-01", "2023-01-01"], + "nature": ["Marche subsequent", "", "Autre"], + "codeCPV": ["12345678-1", "87654321", "11111111"], + "titulaires": [ + [{"id": "t1", "typeIdentifiant": "SIRET"}], + [{"id": "t2", "typeIdentifiant": "SIRET"}], + [], + ], + # On ne teste pas les modifications + "modification_titulaires": [None, None, None], + "modification_id": [None, None, None], + # Columns for string lists + "considerationsSociales_considerationSociale": [ + ["Clause sociale", "Critère social"], + ["Critère social"], + [""], + ], + "considerationsSociales": [[], [], []], + # Columns for fix_data_types + "dureeMois": ["12", "24", "36"], + "offresRecues": ["1", "2", "3"], + "tauxAvance": ["0", "0", "0"], + "origineFrance": ["0", "0", "0"], + "origineUE": ["0", "0", "0"], + "sousTraitanceDeclaree": ["false", "false", "false"], + "attributionAvance": ["false", "false", "false"], + "marcheInnovant": ["false", "false", "false"], + # Columns for clean_null_equivalent + "considerationsEnvironnementales": [ + ["Pas de considération environnementale"], + ["Clause environnementale"], + [""], + ], + "ccag": ["Pas de CCAG", "CCAG", ""], + "typeGroupement": ["Pas de groupement", "Solidaire", ""], + } + + lf = pl.LazyFrame(data) + + # Test with DECP 2019 + decp_format_2019 = DecpFormat("DECP 2019", SCHEMA_MARCHE_2019, "marches") + df_result: pl.DataFrame = clean_decp(lf, decp_format_2019).collect() + + # Check id cleaning + assert df_result.filter(pl.col("id") == "id_1").height > 0 + assert df_result.filter(pl.col("id") == "id_2").height > 0 + + # Empty id/acheteur_id should be filtered out + assert df_result.filter(pl.col("id") == "").height == 0 + + # Check uid creation + assert df_result["uid"].to_list() == ["ach1id_1", "ach2id_2"] + + # Check montant replacement + assert 12311111111.0 in df_result["montant"].to_list() + assert "1.0E17" not in df_result["montant"].to_list() + + # Check string lists + assert "considerationsSociales_considerationSociale" not in df_result + assert "considerationsSociales" in df_result + assert df_result["considerationsSociales"].to_list() == [ + "Clause sociale, Critère social", + "Critère social", + ] + + # Check null equivalent + assert df_result["considerationsEnvironnementales"][0] == "Sans objet" + assert df_result["ccag"][0] == "Sans objet" + assert df_result["typeGroupement"][0] == "Sans objet" + + # Check nature replacement + assert df_result["nature"][0] == "Marché subséquent" + + # Check codeCPV + assert df_result["codeCPV"][0] == "12345678" diff --git a/tests/test_enrich.py b/tests/test_enrich.py new file mode 100644 index 0000000..893f514 --- /dev/null +++ b/tests/test_enrich.py @@ -0,0 +1,39 @@ +import polars as pl +from polars.testing import assert_frame_equal + +from src.config import SIRENE_DATA_DIR +from src.tasks.enrich import add_etablissement_data + + +class TestEnrich: + def test_add_etablissement_data(self): + lf_sirets = pl.LazyFrame( + {"org_id": ["12345678900022", "12345678900023"], "org_nom": ["Org", "Org"]} + ) + + lf_etablissement = pl.scan_parquet(SIRENE_DATA_DIR / "etablissements.parquet") + + lf_output = pl.LazyFrame( + { + "org_id": ["12345678900022", "12345678900023"], + "org_latitude": [11.12, 11.12], + "org_longitude": [12.13, 12.13], + "org_commune_code": ["12345", "12345"], + "org_departement_code": ["12", "12"], + "org_region_code": ["01", "01"], + "org_commune_nom": ["Commune", "Commune"], + "org_departement_nom": ["Département", "Département"], + "org_region_nom": ["Région", "Région"], + "org_nom": ["Org (Établissement nom)", "Org"], + "activite_code": ["11.11A", "11.11B"], + "activite_nomenclature": ["NAFRev2", "NAFRev2"], + } + ) + + assert_frame_equal( + add_etablissement_data( + lf_sirets, lf_etablissement, "org_id", "org" + ).collect(), + lf_output.collect(), + check_column_order=False, + ) diff --git a/tests/test_get.py b/tests/test_get.py deleted file mode 100644 index db49be4..0000000 --- a/tests/test_get.py +++ /dev/null @@ -1,163 +0,0 @@ -from unittest import TestCase - -from tasks.get import clean_json - - -class TestGet: - def test_clean_json(self): - input_json = [ - { - "id": "2020-698182-01", - "acheteur": {"id": "22870851700989"}, - "dureeMois": 40, - "dateNotification": "2020-02-06", - "datePublicationDonnees": "2020-02-14", - "montant": 23004.0, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "83415751300245"}} - ], - "modifications": [ - { - "modification": { - "dateNotificationModification": "2023-06-07", - "dureeMois": 1, - "montant": 1, - "id": 1, - "titulaires": [ - {"titulaire": "typeIdentifiant"}, - {"titulaire": "id"}, - ], - "datePublicationDonneesModification": "2023-06-20", - } - }, - { - "modification": { - "dateNotificationModification": "2025-04-01", - "dureeMois": 1, - "montant": 22545, - "id": 2, - "datePublicationDonneesModification": "2025-04-01", - } - }, - { - "modification": { - "dateNotificationModification": "2025-04-01", - "dureeMois": 2, - "montant": 22540, - "id": 2, - "titulaires": [ - { - "titulaire": { - "typeIdentifiant": "SIRET", - "id": "58211867500054", - } - }, - { - "titulaire": { - "typeIdentifiant": "SIRET", - "id": "05650171100115", - } - }, - { - "titulaire": { - "typeIdentifiant": "SIRET", - "id": "88359829400030", - } - }, - ], - "datePublicationDonneesModification": "2025-04-01", - } - }, - { - "modification": { - "dateNotificationModification": "2027-04-01", - "dureeMois": 100, - "id": 3, - "datePublicationDonneesModification": "2027-04-01", - } - }, - ], - "origineFrance": 0.0, - } - ] - - # Expected DataFrame - expected_json = [ - { - "id": "2020-698182-01", - "acheteur": {"id": "22870851700989"}, - "dureeMois": 40, - "dateNotification": "2020-02-06", - "datePublicationDonnees": "2020-02-14", - "montant": 23004.0, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "83415751300245"}} - ], - "modifications": [ - { - "modification": { - "dateNotificationModification": "2023-06-07", - "dureeMois": 1, - "montant": 1, - "id": 1, - "datePublicationDonneesModification": "2023-06-20", - } - }, - { - "modification": { - "dateNotificationModification": "2025-04-01", - "dureeMois": 1, - "montant": 22545, - "id": 2, - "datePublicationDonneesModification": "2025-04-01", - } - }, - { - "modification": { - "dateNotificationModification": "2025-04-01", - "dureeMois": 2, - "montant": 22540, - "id": 2, - "titulaires": [ - { - "titulaire": { - "typeIdentifiant": "SIRET", - "id": "58211867500054", - } - }, - { - "titulaire": { - "typeIdentifiant": "SIRET", - "id": "05650171100115", - } - }, - { - "titulaire": { - "typeIdentifiant": "SIRET", - "id": "88359829400030", - } - }, - ], - "datePublicationDonneesModification": "2025-04-01", - } - }, - { - "modification": { - "dateNotificationModification": "2027-04-01", - "dureeMois": 100, - "id": 3, - "datePublicationDonneesModification": "2027-04-01", - } - }, - ], - "origineFrance": 0.0, - } - ] - - # Call the function - result_json = clean_json(input_json) - print("Result JSON:", result_json) - print("Expected JSON:", expected_json) - # Assert the result matches the expected DataFrame - for result, expected in zip(result_json, expected_json): - TestCase().assertDictEqual(expected, result) diff --git a/tests/test_main.py b/tests/test_main.py index 2d63165..991b0a1 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -4,5 +4,5 @@ class TestFlow: def test_decp_processing(self): - with prefect_test_harness(server_startup_timeout=10): + with prefect_test_harness(server_startup_timeout=30): decp_processing() diff --git a/tests/test_sirene_preprocess.py b/tests/test_sirene_preprocess.py deleted file mode 100644 index 68b3192..0000000 --- a/tests/test_sirene_preprocess.py +++ /dev/null @@ -1,8 +0,0 @@ -from flows.sirene_preprocess import sirene_preprocess -from tests.fixtures import prefect_test_harness - - -class TestFlow: - def test_sirene_preprocess(self): - with prefect_test_harness(server_startup_timeout=10): - sirene_preprocess() diff --git a/tests/test_transform.py b/tests/test_transform.py index 8c19a5b..e0fffe6 100644 --- a/tests/test_transform.py +++ b/tests/test_transform.py @@ -2,59 +2,151 @@ from polars.testing import assert_frame_equal from tasks.transform import ( - remove_modifications_duplicates, + prepare_etablissements, + prepare_unites_legales, replace_with_modification_data, ) -class TestHandleModificationsMarche: - def test_remove_modifications_duplicates(self): - df = pl.LazyFrame( - { - "uid": ["202401", "20240101", "20240102", "20240102", "2025010203"], - "modifications": [[], [1], [1, 2], [], []], - } +class TestPrepareUnitesLegales: + def test_prepare_unites_legales(self): + lf = pl.LazyFrame( + [ + # Cas 1: Personne morale + { + "siren": "111111111", + "denominationUniteLegale": "Org 1", + "prenomUsuelUniteLegale": None, + "nomUniteLegale": None, + "nomUsageUniteLegale": None, + "statutDiffusionUniteLegale": "O", + }, + # Cas 2: Personne physique avec nom d'usage + { + "siren": "222222222", + "denominationUniteLegale": None, + "prenomUsuelUniteLegale": "Ambroise", + "nomUniteLegale": "Croizat", + "nomUsageUniteLegale": "Zacroit", # a la priorité + "statutDiffusionUniteLegale": "O", + }, + # Cas 3: Personne physique sans nom d'usage + { + "siren": "333333333", + "denominationUniteLegale": None, + "prenomUsuelUniteLegale": "Ambroise", + "nomUniteLegale": "Croizat", + "nomUsageUniteLegale": None, + "statutDiffusionUniteLegale": "O", + }, + # Cas 4: Nom non-diffusible + { + "siren": "44444444", + "denominationUniteLegale": None, + "prenomUsuelUniteLegale": "Ambroise", + "nomUniteLegale": "Croizat", + "nomUsageUniteLegale": None, + "statutDiffusionUniteLegale": "P", + }, + ] ) - cleaned_df = remove_modifications_duplicates(df).collect() - assert len(cleaned_df) == 3 - assert cleaned_df.sort("uid")["uid"].to_list() == sorted( - ["202401", "20240102", "2025010203"] + # Expected DataFrame + expected_df = pl.DataFrame( + [ + # Cas 1: denominationUniteLegale est préservé + {"siren": "111111111", "denominationUniteLegale": "Org 1"}, + # Cas 2: denominationUniteLegale = prenom + nomUsage (Zacroit) + {"siren": "222222222", "denominationUniteLegale": "Ambroise Zacroit"}, + # Cas 3: denominationUniteLegale = prenom + nom (Croizat) + {"siren": "333333333", "denominationUniteLegale": "Ambroise Croizat"}, + # Cas 4: denominationUniteLegale = non-diffusible + { + "siren": "44444444", + "denominationUniteLegale": "[Données personnelles non-diffusibles]", + }, + ] ) - def test_handle_modifications_marche_all_cases(self): - # Input LazyFrame + # Application de la fonction + result_df = prepare_unites_legales(lf).collect() + + # Tri des df + result_df = result_df.sort("siren") + expected_df = expected_df.sort("siren") + + assert_frame_equal(result_df, expected_df) + + +class TestPrepareEtablissements: + def test_prepare_etablissements(self): lf = pl.LazyFrame( [ { - "uid": 1, + "siret": "11111111111", + "codeCommuneEtablissement": "1053", + "enseigne1Etablissement": None, + "denominationUsuelleEtablissement": "Dénom usuelle", + "activitePrincipaleEtablissement": "11.1A", + "nomenclatureActivitePrincipaleEtablissement": "NAFv2", + } + ] + ) + + expected_df = pl.DataFrame( + [ + { + "siret": "00011111111111", + "commune_code": "01053", + "etablissement_nom": "Dénom usuelle", + "activite_code": "11.1A", + "activite_nomenclature": "NAFv2", + "commune_nom": "Bourg-en-Bresse", + "departement_code": "01", + "region_code": "84", + "region_nom": "Auvergne-Rhône-Alpes", + "departement_nom": "Ain", + } + ] + ) + + assert_frame_equal( + prepare_etablissements(lf).collect(), + expected_df, + check_column_order=False, + check_dtypes=True, + ) + + +class TestHandleModificationsMarche: + def test_replace_with_modification_data(self): + # Input LazyFrame - 3 test cases covering key scenarios + lf = pl.LazyFrame( + [ + # Case 1: uid=1 with 2 modifications (changes to montant, dureeMois, titulaires) + { + "uid": "1", "montant": 1000, "dureeMois": 12, "acheteur_id": "12345", "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00011"}}, - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00012"}}, + {"titulaire": {"typeIdentifiant": "SIRET", "id": "00013"}} ], "dateNotification": "2023-01-01", "datePublicationDonnees": "2023-01-02", - "modification_dateNotificationModification": "2023-01-02", - "modification_datePublicationDonneesModification": "2023-01-03", - "modification_montant": 1000, - "modification_dureeMois": 15, - "modification_titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00012"}}, - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00013"}}, - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00014"}}, - ], + "modification_dateNotificationModification": None, + "modification_datePublicationDonneesModification": None, + "modification_montant": None, + "modification_dureeMois": None, + "modification_titulaires": None, }, { - "uid": 1, + "uid": "1", "montant": 1000, "dureeMois": 12, "acheteur_id": "12345", "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00011"}}, - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00012"}}, + {"titulaire": {"typeIdentifiant": "SIRET", "id": "00013"}} ], "dateNotification": "2023-01-01", "datePublicationDonnees": "2023-01-02", @@ -62,212 +154,75 @@ def test_handle_modifications_marche_all_cases(self): "modification_datePublicationDonneesModification": "2023-02-05", "modification_montant": 1500, "modification_dureeMois": 18, - "modification_titulaires": None, - }, - { - "uid": 2, - "montant": 2000, - "dureeMois": 24, - "acheteur_id": "88888", - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0002"}} - ], - "dateNotification": "2023-02-02", - "datePublicationDonnees": "2023-02-03", - "modification_dateNotificationModification": "2023-02-03", - "modification_datePublicationDonneesModification": "2023-02-04", - "modification_montant": None, - "modification_dureeMois": 12, - "modification_titulaires": None, - }, - { - "uid": 3, - "acheteur_id": "88888", - "montant": 10000, - "dureeMois": 36, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0003"}} + "modification_titulaires": [ + {"titulaire": {"typeIdentifiant": "SIRET", "id": "00011"}}, + {"titulaire": {"typeIdentifiant": "SIRET", "id": "00012"}}, ], - "dateNotification": "2023-01-02", - "datePublicationDonnees": "2023-01-08", - "modification_dateNotificationModification": "2023-01-03", - "modification_datePublicationDonneesModification": "2023-01-12", - "modification_montant": 3000, - "modification_dureeMois": None, - "modification_titulaires": None, }, + # Case 2: uid=2 with no modifications (all modification fields are None) { - "uid": 4, - "acheteur_id": "77777", + "uid": "2", + "acheteur_id": "99999", "montant": 500, - "dureeMois": 10, + "dureeMois": 6, "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0004"}} + {"titulaire": {"typeIdentifiant": "SIRET", "id": "0003"}} ], - "dateNotification": "2023-06-01", - "datePublicationDonnees": "2023-06-02", - "modification_dateNotificationModification": "2023-06-02", - "modification_datePublicationDonneesModification": "2023-06-03", + "dateNotification": "2023-03-01", + "datePublicationDonnees": "2023-03-02", + "modification_dateNotificationModification": None, + "modification_datePublicationDonneesModification": None, "modification_montant": None, "modification_dureeMois": None, "modification_titulaires": None, }, - { - "uid": 4, - "montant": 500, - "dureeMois": 10, - "acheteur_id": "77777", - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0004"}} - ], - "dateNotification": "2023-06-01", - "datePublicationDonnees": "2023-06-02", - "modification_dateNotificationModification": "2023-06-03", - "modification_datePublicationDonneesModification": "2023-06-04", - "modification_montant": 1500, - "modification_dureeMois": None, - "modification_titulaires": None, - }, ] ) # Expected DataFrame expected_df = pl.DataFrame( [ + # uid=1: 2 rows (original + 1 modification) { - "uid": 1, + "uid": "1", "dateNotification": "2023-02-04", "datePublicationDonnees": "2023-02-05", "montant": 1500, "dureeMois": 18, "titulaires": [ + {"titulaire": {"typeIdentifiant": "SIRET", "id": "00011"}}, {"titulaire": {"typeIdentifiant": "SIRET", "id": "00012"}}, - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00013"}}, - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00014"}}, - ], - "modification_id": 2, - "donneesActuelles": True, - "acheteur_id": "12345", - }, - { - "uid": 1, - "dateNotification": "2023-01-02", - "datePublicationDonnees": "2023-01-03", - "montant": 1000, - "dureeMois": 15, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00012"}}, - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00013"}}, - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00014"}}, ], "modification_id": 1, - "donneesActuelles": False, + "donneesActuelles": True, "acheteur_id": "12345", }, { - "uid": 1, + "uid": "1", "dateNotification": "2023-01-01", "datePublicationDonnees": "2023-01-02", "montant": 1000, "dureeMois": 12, "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00011"}}, - {"titulaire": {"typeIdentifiant": "SIRET", "id": "00012"}}, + {"titulaire": {"typeIdentifiant": "SIRET", "id": "00013"}}, ], "modification_id": 0, "donneesActuelles": False, "acheteur_id": "12345", }, + # uid=2: 1 row (no modifications) { - "uid": 2, - "dateNotification": "2023-02-03", - "datePublicationDonnees": "2023-02-04", - "montant": 2000, - "dureeMois": 12, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0002"}} - ], - "modification_id": 1, - "donneesActuelles": True, - "acheteur_id": "88888", - }, - { - "uid": 2, - "dateNotification": "2023-02-02", - "datePublicationDonnees": "2023-02-03", - "montant": 2000, - "dureeMois": 24, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0002"}} - ], - "modification_id": 0, - "donneesActuelles": False, - "acheteur_id": "88888", - }, - { - "uid": 3, - "dateNotification": "2023-01-03", - "datePublicationDonnees": "2023-01-12", - "montant": 3000, - "dureeMois": 36, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0003"}} - ], - "modification_id": 1, - "donneesActuelles": True, - "acheteur_id": "88888", - }, - { - "uid": 3, - "dateNotification": "2023-01-02", - "datePublicationDonnees": "2023-01-08", - "montant": 10000, - "dureeMois": 36, + "uid": "2", + "dateNotification": "2023-03-01", + "datePublicationDonnees": "2023-03-02", + "montant": 500, + "dureeMois": 6, "titulaires": [ {"titulaire": {"typeIdentifiant": "SIRET", "id": "0003"}} ], "modification_id": 0, - "donneesActuelles": False, - "acheteur_id": "88888", - }, - { - "uid": 4, - "dateNotification": "2023-06-03", - "datePublicationDonnees": "2023-06-04", - "montant": 1500, - "dureeMois": 10, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0004"}} - ], - "modification_id": 2, "donneesActuelles": True, - "acheteur_id": "77777", - }, - { - "uid": 4, - "dateNotification": "2023-06-02", - "datePublicationDonnees": "2023-06-03", - "montant": 500, - "dureeMois": 10, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0004"}} - ], - "modification_id": 1, - "donneesActuelles": False, - "acheteur_id": "77777", - }, - { - "uid": 4, - "dateNotification": "2023-06-01", - "datePublicationDonnees": "2023-06-02", - "montant": 500, - "dureeMois": 10, - "titulaires": [ - {"titulaire": {"typeIdentifiant": "SIRET", "id": "0004"}} - ], - "modification_id": 0, - "donneesActuelles": False, - "acheteur_id": "77777", + "acheteur_id": "99999", }, ] ) @@ -275,31 +230,18 @@ def test_handle_modifications_marche_all_cases(self): # Call the function result_df = replace_with_modification_data(lf).collect() - print( - expected_df["uid", "dateNotification", "montant", "modification_id"] - .to_pandas() - .to_string() - ) + # print( + # expected_df["uid", "dateNotification", "montant", "modification_id"] + # .to_pandas() + # .to_string() + # ) - print( - result_df["uid", "dateNotification", "montant", "modification_id"] - .to_pandas() - .to_string() - ) + # print( + # result_df["uid", "dateNotification", "montant", "modification_id"] + # .to_pandas() + # .to_string() + # ) # Assert the result matches the expected DataFrame assert_frame_equal( result_df, expected_df, check_column_order=False, check_dtypes=False ) - - -# class TestNafCsvMatching: -# def test_calculate_naf_cpv_matching(self): -# df = pl.DataFrame({ -# "uid": ["1", "1", "2", "3", "4", "pas_actuel"], -# "codeCPV": ["cpv1", "cpv2", "cpv2", "cpv2", "cpv3", "cpv3"], -# "activite_code": ["naf1", "naf1", "naf1", "naf1", "naf1", "naf1"], -# "activite_nomenclature": ["nom1", "nom1", "nom1", "nom1", "nom1", "nom1"], -# "donneesActuelles": [True, False, True, True, True, True], -# }) -# -# df = calculate_naf_cpv_matching(df)