diff --git a/CHANGELOG.md b/CHANGELOG.md index 322bf53..473ec5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### 2.6.2 2025-12-15 + +- Réduction du nombre de tâches prefect pour réduire la charge sur la BDD et la latence +- Utilisation du multithreading standard de Python plutôt que celui de Prefect +- Le nom d'établissement n'est ajouté entre parenthèses que s'il est différent de celui de l'unité légale + ### 2.6.1 2025-12-14 - Séparation des fichiers de référence et des fichiers de données diff --git a/README.md b/README.md index b3ad214..9153400 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # DECP processing -> version 2.6.0 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) +> version 2.6.2 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) Projet de traitement et de publication de meilleures données sur les marchés publics attribués en France. Vous pouvez consulter, filtrer et télécharger ces données sur le site [decp.info](https://decp.info). Enfin la section [À propos](https://decp.info/a-propos) décrit les objectifs du projet et regroupe toutes les informations clés. diff --git a/pyproject.toml b/pyproject.toml index 73748ee..ca90f06 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "decp-processing" description = "Traitement des données des marchés publics français." -version = "2.5.0" +version = "2.6.2" requires-python = ">= 3.9" authors = [ { name = "Colin Maudry", email = "colin+decp@maudry.com" } @@ -22,7 +22,8 @@ dependencies = [ "bs4", "selenium", "polars_ds", - "scikit-learn" + "scikit-learn", + "tenacity" ] [project.optional-dependencies] diff --git a/src/config.py b/src/config.py index 5d29c52..93583ad 100644 --- a/src/config.py +++ b/src/config.py @@ -26,20 +26,15 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: return Path(os.getenv(env) or alternative_path) -print(""" -########## -# Config # -########## -""") - +ALL_CONFIG = {} # Nombre maximal de workers utilisables par Prefect. Défaut : 16 MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4)) -print(f"{'MAX_PREFECT_WORKERS':<40}", MAX_PREFECT_WORKERS) +ALL_CONFIG["MAX_PREFECT_WORKERS"] = MAX_PREFECT_WORKERS # Durée avant l'expiration du cache des ressources (en heure). Défaut : 168 (7 jours) CACHE_EXPIRATION_TIME_HOURS = int(os.getenv("CACHE_EXPIRATION_TIME_HOURS", 168)) -print(f"{'CACHE_EXPIRATION_TIME_HOURS':<40}", CACHE_EXPIRATION_TIME_HOURS) +ALL_CONFIG["CACHE_EXPIRATION_TIME_HOURS"] = CACHE_EXPIRATION_TIME_HOURS DATE_NOW = datetime.now().isoformat()[0:10] # YYYY-MM-DD @@ -47,14 +42,14 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: # Publication ou non des fichiers produits sur data.gouv.fr DECP_PROCESSING_PUBLISH = os.getenv("DECP_PROCESSING_PUBLISH", "").lower() == "true" -print(f"{'DECP_PROCESSING_PUBLISH':<40}", DECP_PROCESSING_PUBLISH) +ALL_CONFIG["DECP_PROCESSING_PUBLISH"] = DECP_PROCESSING_PUBLISH # Client HTTP HTTP_CLIENT = httpx.Client() HTTP_HEADERS = { "Connection": "keep-alive", - "User-agent": "Projet : https://decp.info/a-propos | Client HTTP : https://pypi.org/project/httpx/", + "User-agent": "decp.info", } # Timeout pour la publication de chaque ressource sur data.gouv.fr @@ -77,21 +72,22 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: # Dossier racine BASE_DIR = make_path_from_env("DECP_BASE_DIR", Path(__file__).absolute().parent.parent) -print(f"{'BASE_DIR':<40}", BASE_DIR) +ALL_CONFIG["BASE_DIR"] = BASE_DIR # Les variables configurées sur le serveur doivent avoir la priorité DATA_DIR = make_path_from_env("DECP_DATA_DIR", BASE_DIR / "data") DATA_DIR.mkdir(exist_ok=True, parents=True) -print(f"{'DATA_DIR':<40}", DATA_DIR) +ALL_CONFIG["DATA_DIR"] = DATA_DIR RESOURCE_CACHE_DIR = make_path_from_env( "RESOURCE_CACHE_DIR", DATA_DIR / "resource_cache" ) RESOURCE_CACHE_DIR.mkdir(exist_ok=True, parents=True) +ALL_CONFIG["RESOURCE_CACHE_DIR"] = RESOURCE_CACHE_DIR DIST_DIR = make_path_from_env("DECP_DIST_DIR", BASE_DIR / "dist") DIST_DIR.mkdir(exist_ok=True, parents=True, mode=777) -print(f"{'DIST_DIR':<40}", DIST_DIR) +ALL_CONFIG["DIST_DIR"] = DIST_DIR def make_sirene_data_dir(sirene_data_parent_dir) -> Path: @@ -116,19 +112,19 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: SIRENE_DATA_DIR = Path(os.path.join(BASE_DIR, SIRENE_DATA_DIR)) # SIRENE_DATA_DIR on ne le crée que si nécessaire, dans flows.py -print(f"{'SIRENE_DATA_PARENT_DIR':<40}", SIRENE_DATA_PARENT_DIR) -print(f"{'SIRENE_DATA_DIR':<40}", SIRENE_DATA_DIR) +ALL_CONFIG["SIRENE_DATA_PARENT_DIR"] = SIRENE_DATA_PARENT_DIR +ALL_CONFIG["SIRENE_DATA_DIR"] = SIRENE_DATA_DIR SIRENE_UNITES_LEGALES_URL = os.getenv("SIRENE_UNITES_LEGALES_URL", "") # Mode de scraping SCRAPING_MODE = os.getenv("SCRAPING_MODE", "month") -print(f"{'SCRAPING_MODE':<40}", SCRAPING_MODE) +ALL_CONFIG["SCRAPING_MODE"] = SCRAPING_MODE # Target (plateforme cible pour le scraping) SCRAPING_TARGET = os.getenv("SCRAPING_TARGET") -print(f"{'SCRAPING_TARGET':<40}", SCRAPING_TARGET) +ALL_CONFIG["SCRAPING_TARGET"] = SCRAPING_TARGET # Lecture ou non des ressource en cache DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true" @@ -138,6 +134,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: # Données de référence REFERENCE_DIR = BASE_DIR / "reference" +ALL_CONFIG["REFERENCE_DIR"] = REFERENCE_DIR # Liste et ordre des colonnes pour le mono dataframe de base (avant normalisation et spécialisation) # Sert aussi à vérifier qu'au moins ces colonnes sont présentes (d'autres peuvent être présentes en plus, les colonnes "innatendues") @@ -169,7 +166,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: # Liste des ID de ressources présentes dans un dataset à traiter, au format JSON ou XML, mais exclues du traitement EXCLUDED_RESOURCES = os.getenv("EXCLUDED_RESOURCES", "").replace(" ", "") -print(f"{'EXCLUDED_RESOURCES':<40}", EXCLUDED_RESOURCES) +ALL_CONFIG["EXCLUDED_RESOURCES"] = EXCLUDED_RESOURCES EXCLUDED_RESOURCES = EXCLUDED_RESOURCES.split(",") EXCLUDED_RESOURCES = ( @@ -191,7 +188,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: # Ne traiter qu'un seul dataset identifier par son ID SOLO_DATASET = os.getenv("SOLO_DATASET", "") -print(f"{'SOLO_DATASET':<40}", SOLO_DATASET) +ALL_CONFIG["SOLO_DATASET"] = SOLO_DATASET with open( make_path_from_env( @@ -212,6 +209,3 @@ class DecpFormat: prefixe_json_marches: str liste_marches_ijson: sendable_list | None = None coroutine_ijson: Coroutine | None = None - - -print("") diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index b101620..b838bcc 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -1,17 +1,17 @@ import os import shutil +from concurrent.futures import ThreadPoolExecutor import polars as pl import polars.selectors as cs -from prefect import flow +from prefect import flow, task from prefect.artifacts import create_table_artifact from prefect.context import get_run_context -from prefect.task_runners import ConcurrentTaskRunner from prefect_email import EmailServerCredentials, email_send_message from src.config import ( + ALL_CONFIG, BASE_DF_COLUMNS, - BASE_DIR, DATE_NOW, DECP_PROCESSING_PUBLISH, DIST_DIR, @@ -33,15 +33,19 @@ concat_parquet_files, sort_columns, ) -from src.tasks.utils import full_resource_name, generate_stats, remove_unused_cache +from src.tasks.utils import ( + full_resource_name, + generate_stats, + print_all_config, + remove_unused_cache, +) -@flow( - log_prints=True, - task_runner=ConcurrentTaskRunner(max_workers=MAX_PREFECT_WORKERS), -) +@flow(log_prints=True) def decp_processing(enable_cache_removal: bool = True): - print(f"🚀 Début du flow decp-processing dans base dir {BASE_DIR} ") + print("🚀 Début du flow decp-processing") + + print_all_config(ALL_CONFIG) print("Liste de toutes les ressources des datasets...") resources: list[dict] = list_resources(TRACKED_DATASETS) @@ -53,42 +57,24 @@ def decp_processing(enable_cache_removal: bool = True): available_parquet_files = set(os.listdir(RESOURCE_CACHE_DIR)) # Traitement parallèle des ressources par lots pour éviter la surcharge mémoire - batch_size = 500 + batch_size = 100 parquet_files = [] - # Filtrer les ressources à traiter + # Filtrer les ressources à traiter, en ne gardant que les fichiers > 100 octets resources_to_process = [r for r in resources if r["filesize"] > 100] - # Afin d'être sûr que je ne publie pas par erreur un jeu de données de test - decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000 - for i in range(0, len(resources_to_process), batch_size): - batch = resources_to_process[i : i + batch_size] - print( - f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}" + process_batch( + available_parquet_files, + batch_size, + i, + parquet_files, + resources_artifact, + resources_to_process, ) - futures = {} - for resource in batch: - future = get_clean.submit( - resource, resources_artifact, available_parquet_files - ) - futures[future] = full_resource_name(resource) - - for f in futures: - try: - result = f.result() - if result is not None: - parquet_files.append(result) - except Exception as e: - resource_name = futures[f] - print( - f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):" - ) - print(e) - - # Nettoyage explicite - futures.clear() + # Afin d'être sûr que je ne publie pas par erreur un jeu de données de test + decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000 if decp_publish: create_table_artifact( @@ -150,6 +136,40 @@ def decp_processing(enable_cache_removal: bool = True): print("☑️ Fin du flow principal decp_processing.") +@task(retries=2) +def process_batch( + available_parquet_files, + batch_size, + i, + parquet_files, + resources_artifact, + resources_to_process, +): + batch = resources_to_process[i : i + batch_size] + print( + f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}" + ) + futures = {} + with ThreadPoolExecutor(max_workers=MAX_PREFECT_WORKERS) as executor: + for resource in batch: + future = executor.submit( + get_clean, resource, resources_artifact, available_parquet_files + ) + futures[future] = full_resource_name(resource) + + for future in futures: + try: + result = future.result() + if result is not None: + parquet_files.append(result) + except Exception as e: + resource_name = futures[future] + print(f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):") + print(e) + # Nettoyage explicite + futures.clear() + + @sirene_preprocess.on_failure @decp_processing.on_failure def notify_exception_by_email(flow, flow_run, state): diff --git a/src/tasks/enrich.py b/src/tasks/enrich.py index 9fad829..86d1007 100644 --- a/src/tasks/enrich.py +++ b/src/tasks/enrich.py @@ -1,7 +1,6 @@ import polars as pl import polars.selectors as cs from polars_ds import haversine -from prefect import task from src.config import SIRENE_DATA_DIR from src.tasks.transform import ( @@ -29,7 +28,10 @@ def add_etablissement_data( # Si il y a un etablissement_nom (Enseigne1Etablissement ou denominationUsuelleEtablissement), # on l'ajoute au nom de l'organisme, entre parenthèses lf_sirets = lf_sirets.with_columns( - pl.when(pl.col("etablissement_nom").is_not_null()) + pl.when( + pl.col("etablissement_nom").is_not_null() + & (pl.col("etablissement_nom") != pl.col(f"{type_siret}_nom")) + ) .then( pl.concat_str( pl.col(f"{type_siret}_nom"), @@ -76,7 +78,6 @@ def add_unite_legale_data( return lf_sirets -@task(log_prints=True) def enrich_from_sirene(lf: pl.LazyFrame): # Récupération des données SIRET/SIREN préparées dans sirene-preprocess() lf_etablissements = pl.scan_parquet(SIRENE_DATA_DIR / "etablissements.parquet") diff --git a/src/tasks/get.py b/src/tasks/get.py index 8530cc3..c472a7c 100644 --- a/src/tasks/get.py +++ b/src/tasks/get.py @@ -11,8 +11,13 @@ import polars as pl from httpx import Client, HTTPStatusError, TimeoutException, get from lxml import etree, html -from prefect import task from prefect.transactions import transaction +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) from src.config import ( DATA_DIR, @@ -39,7 +44,11 @@ ) -@task(retries=3, retry_delay_seconds=3) +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type(httpx.HTTPError), # On ne retry que sur erreur http +) def stream_get(url: str, chunk_size=1024**2): # chunk_size en octets (1 Mo par défaut) if url.startswith("http"): try: @@ -58,7 +67,6 @@ def stream_get(url: str, chunk_size=1024**2): # chunk_size en octets (1 Mo par yield chunk -@task(persist_result=False) def get_resource( r: dict, resources_artifact: list[dict] | list ) -> tuple[pl.LazyFrame | None, DecpFormat | None]: @@ -128,7 +136,6 @@ def find_json_decp_format(chunk, decp_formats, resource: dict): return None -@task(persist_result=False, log_prints=True) def json_stream_to_parquet( url: str, output_path: Path, resource: dict ) -> tuple[set, DecpFormat or None]: @@ -202,7 +209,6 @@ def json_stream_to_parquet( return fields, decp_format -@task(persist_result=False) def xml_stream_to_parquet( url: str, output_path: Path, fix_chars=False ) -> tuple[set, DecpFormat]: @@ -437,12 +443,6 @@ def get_insee_cog_data(url, schema_overrides, columns) -> pl.DataFrame: return df_insee -@task( - log_prints=True, - # persist_result=True, - # cache_expiration=datetime.timedelta(hours=CACHE_EXPIRATION_TIME_HOURS), - # cache_key_fn=get_clean_cache_key, -) def get_clean( resource, resources_artifact: list, available_parquet_files: set ) -> pl.DataFrame or None: @@ -474,7 +474,6 @@ def get_clean( return parquet_path.with_suffix(".parquet") -@task def get_unite_legales(processed_parquet_path): print("Téléchargement des données unité légales et sélection des colonnes...") ( diff --git a/src/tasks/utils.py b/src/tasks/utils.py index 9757c20..7934894 100644 --- a/src/tasks/utils.py +++ b/src/tasks/utils.py @@ -60,14 +60,6 @@ def remove_sirene_data_dir(transaction): # -def get_clean_cache_key(context, parameters) -> str: - resource = parameters["resource"] - - # On utilise le hash sha1 de la ressource, généré par data.gouv.fr, comme clé de cache - return resource["checksum"] - - -@task def remove_unused_cache( cache_dir: Path = RESOURCE_CACHE_DIR, cache_expiration_time_hours: int = CACHE_EXPIRATION_TIME_HOURS, @@ -82,7 +74,7 @@ def remove_unused_cache( print(f"Suppression du fichier de cache: {file}") deleted_files.append(file) file.unlink() - print(f"{len(deleted_files)} fichiers supprimés") + print(f"-> {len(deleted_files)} fichiers supprimés") # @@ -341,3 +333,10 @@ def check_parquet_file(path) -> bool: return result except (FileNotFoundError, pl.exceptions.ComputeError): return False + + +def print_all_config(all_config): + msg = "" + for k, v in sorted(all_config.items()): + msg += f"\n{k}: {v}" + print(msg)