From 59cdc4e64fe63d317f18f12280391045ee049832 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 15 Dec 2025 16:24:09 +0100 Subject: [PATCH 1/8] =?UTF-8?q?Moins=20de=20prefect.task=20pour=20all?= =?UTF-8?q?=C3=A9ger=20la=20bdd=20de=20Prefect?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/tasks/get.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/tasks/get.py b/src/tasks/get.py index 8530cc3..08ac31d 100644 --- a/src/tasks/get.py +++ b/src/tasks/get.py @@ -39,7 +39,6 @@ ) -@task(retries=3, retry_delay_seconds=3) def stream_get(url: str, chunk_size=1024**2): # chunk_size en octets (1 Mo par défaut) if url.startswith("http"): try: @@ -58,7 +57,6 @@ def stream_get(url: str, chunk_size=1024**2): # chunk_size en octets (1 Mo par yield chunk -@task(persist_result=False) def get_resource( r: dict, resources_artifact: list[dict] | list ) -> tuple[pl.LazyFrame | None, DecpFormat | None]: @@ -128,7 +126,6 @@ def find_json_decp_format(chunk, decp_formats, resource: dict): return None -@task(persist_result=False, log_prints=True) def json_stream_to_parquet( url: str, output_path: Path, resource: dict ) -> tuple[set, DecpFormat or None]: @@ -202,7 +199,6 @@ def json_stream_to_parquet( return fields, decp_format -@task(persist_result=False) def xml_stream_to_parquet( url: str, output_path: Path, fix_chars=False ) -> tuple[set, DecpFormat]: @@ -439,7 +435,9 @@ def get_insee_cog_data(url, schema_overrides, columns) -> pl.DataFrame: @task( log_prints=True, - # persist_result=True, + persist_result=False, + retries=3, + retry_delay_seconds=3, # cache_expiration=datetime.timedelta(hours=CACHE_EXPIRATION_TIME_HOURS), # cache_key_fn=get_clean_cache_key, ) @@ -474,7 +472,6 @@ def get_clean( return parquet_path.with_suffix(".parquet") -@task def get_unite_legales(processed_parquet_path): print("Téléchargement des données unité légales et sélection des colonnes...") ( From cfda0efb3835779247404bf26710b1474de09abf Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 15 Dec 2025 16:59:15 +0100 Subject: [PATCH 2/8] User agent plus court --- src/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.py b/src/config.py index 5d29c52..22e00dc 100644 --- a/src/config.py +++ b/src/config.py @@ -54,7 +54,7 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: HTTP_CLIENT = httpx.Client() HTTP_HEADERS = { "Connection": "keep-alive", - "User-agent": "Projet : https://decp.info/a-propos | Client HTTP : https://pypi.org/project/httpx/", + "User-agent": "decp.info", } # Timeout pour la publication de chaque ressource sur data.gouv.fr From 23ef522fde7748775b2ca242205a8ebec6523ad5 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 15 Dec 2025 16:59:59 +0100 Subject: [PATCH 3/8] =?UTF-8?q?Utilisation=20de=20tenacity=20plut=C3=B4t?= =?UTF-8?q?=20que=20prefect=20pour=20le=20retry=20des=20HTTP=20stream=20ge?= =?UTF-8?q?t?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pyproject.toml | 3 ++- src/tasks/get.py | 11 +++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 73748ee..a90ca1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,8 @@ dependencies = [ "bs4", "selenium", "polars_ds", - "scikit-learn" + "scikit-learn", + "tenacity" ] [project.optional-dependencies] diff --git a/src/tasks/get.py b/src/tasks/get.py index 08ac31d..26e2fe9 100644 --- a/src/tasks/get.py +++ b/src/tasks/get.py @@ -13,6 +13,12 @@ from lxml import etree, html from prefect import task from prefect.transactions import transaction +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) from src.config import ( DATA_DIR, @@ -39,6 +45,11 @@ ) +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type(httpx.HTTPError), # On ne retry que sur erreur http +) def stream_get(url: str, chunk_size=1024**2): # chunk_size en octets (1 Mo par défaut) if url.startswith("http"): try: From 1048cbe2133abcfdf795be347273448110915019 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 15 Dec 2025 17:24:05 +0100 Subject: [PATCH 4/8] =?UTF-8?q?Ne=20rajoute=20le=20(nom=20d'=C3=A9tablisse?= =?UTF-8?q?ment)=20que=20s'il=20est=20diff=C3=A9rent=20du=20nom=20de=20l'u?= =?UTF-8?q?nit=C3=A9=20l=C3=A9gale?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/tasks/enrich.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/tasks/enrich.py b/src/tasks/enrich.py index 9fad829..86d1007 100644 --- a/src/tasks/enrich.py +++ b/src/tasks/enrich.py @@ -1,7 +1,6 @@ import polars as pl import polars.selectors as cs from polars_ds import haversine -from prefect import task from src.config import SIRENE_DATA_DIR from src.tasks.transform import ( @@ -29,7 +28,10 @@ def add_etablissement_data( # Si il y a un etablissement_nom (Enseigne1Etablissement ou denominationUsuelleEtablissement), # on l'ajoute au nom de l'organisme, entre parenthèses lf_sirets = lf_sirets.with_columns( - pl.when(pl.col("etablissement_nom").is_not_null()) + pl.when( + pl.col("etablissement_nom").is_not_null() + & (pl.col("etablissement_nom") != pl.col(f"{type_siret}_nom")) + ) .then( pl.concat_str( pl.col(f"{type_siret}_nom"), @@ -76,7 +78,6 @@ def add_unite_legale_data( return lf_sirets -@task(log_prints=True) def enrich_from_sirene(lf: pl.LazyFrame): # Récupération des données SIRET/SIREN préparées dans sirene-preprocess() lf_etablissements = pl.scan_parquet(SIRENE_DATA_DIR / "etablissements.parquet") From d1efa069bc18261e6ebd6d5b00327abab362d857 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 15 Dec 2025 17:30:54 +0100 Subject: [PATCH 5/8] =?UTF-8?q?get=5Fclean=20n'est=20plus=20une=20t=C3=A2c?= =?UTF-8?q?he,=20une=20t=C3=A2che=20pour=20100=20ressources?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config.py | 2 + src/flows/decp_processing.py | 72 +++++++++++++++++++++--------------- src/tasks/utils.py | 10 +---- 3 files changed, 45 insertions(+), 39 deletions(-) diff --git a/src/config.py b/src/config.py index 22e00dc..87c3d43 100644 --- a/src/config.py +++ b/src/config.py @@ -88,6 +88,7 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: "RESOURCE_CACHE_DIR", DATA_DIR / "resource_cache" ) RESOURCE_CACHE_DIR.mkdir(exist_ok=True, parents=True) +print(f"{'RESOURCE_CACHE_DIR':<40}", RESOURCE_CACHE_DIR) DIST_DIR = make_path_from_env("DECP_DIST_DIR", BASE_DIR / "dist") DIST_DIR.mkdir(exist_ok=True, parents=True, mode=777) @@ -138,6 +139,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: # Données de référence REFERENCE_DIR = BASE_DIR / "reference" +print(f"{'REFERENCE_DIR':<40}", REFERENCE_DIR) # Liste et ordre des colonnes pour le mono dataframe de base (avant normalisation et spécialisation) # Sert aussi à vérifier qu'au moins ces colonnes sont présentes (d'autres peuvent être présentes en plus, les colonnes "innatendues") diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index b101620..70d0c85 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -3,7 +3,7 @@ import polars as pl import polars.selectors as cs -from prefect import flow +from prefect import flow, task from prefect.artifacts import create_table_artifact from prefect.context import get_run_context from prefect.task_runners import ConcurrentTaskRunner @@ -53,42 +53,24 @@ def decp_processing(enable_cache_removal: bool = True): available_parquet_files = set(os.listdir(RESOURCE_CACHE_DIR)) # Traitement parallèle des ressources par lots pour éviter la surcharge mémoire - batch_size = 500 + batch_size = 100 parquet_files = [] - # Filtrer les ressources à traiter + # Filtrer les ressources à traiter, en ne gardant que les fichiers > 100 octets resources_to_process = [r for r in resources if r["filesize"] > 100] - # Afin d'être sûr que je ne publie pas par erreur un jeu de données de test - decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000 - for i in range(0, len(resources_to_process), batch_size): - batch = resources_to_process[i : i + batch_size] - print( - f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}" + process_batch( + available_parquet_files, + batch_size, + i, + parquet_files, + resources_artifact, + resources_to_process, ) - futures = {} - for resource in batch: - future = get_clean.submit( - resource, resources_artifact, available_parquet_files - ) - futures[future] = full_resource_name(resource) - - for f in futures: - try: - result = f.result() - if result is not None: - parquet_files.append(result) - except Exception as e: - resource_name = futures[f] - print( - f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):" - ) - print(e) - - # Nettoyage explicite - futures.clear() + # Afin d'être sûr que je ne publie pas par erreur un jeu de données de test + decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000 if decp_publish: create_table_artifact( @@ -150,6 +132,36 @@ def decp_processing(enable_cache_removal: bool = True): print("☑️ Fin du flow principal decp_processing.") +@task(retries=2) +def process_batch( + available_parquet_files, + batch_size, + i, + parquet_files, + resources_artifact, + resources_to_process, +): + batch = resources_to_process[i : i + batch_size] + print( + f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}" + ) + futures = {} + for resource in batch: + future = get_clean.submit(resource, resources_artifact, available_parquet_files) + futures[future] = full_resource_name(resource) + for f in futures: + try: + result = f.result() + if result is not None: + parquet_files.append(result) + except Exception as e: + resource_name = futures[f] + print(f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):") + print(e) + # Nettoyage explicite + futures.clear() + + @sirene_preprocess.on_failure @decp_processing.on_failure def notify_exception_by_email(flow, flow_run, state): diff --git a/src/tasks/utils.py b/src/tasks/utils.py index 9757c20..03bec5c 100644 --- a/src/tasks/utils.py +++ b/src/tasks/utils.py @@ -60,14 +60,6 @@ def remove_sirene_data_dir(transaction): # -def get_clean_cache_key(context, parameters) -> str: - resource = parameters["resource"] - - # On utilise le hash sha1 de la ressource, généré par data.gouv.fr, comme clé de cache - return resource["checksum"] - - -@task def remove_unused_cache( cache_dir: Path = RESOURCE_CACHE_DIR, cache_expiration_time_hours: int = CACHE_EXPIRATION_TIME_HOURS, @@ -82,7 +74,7 @@ def remove_unused_cache( print(f"Suppression du fichier de cache: {file}") deleted_files.append(file) file.unlink() - print(f"{len(deleted_files)} fichiers supprimés") + print(f"-> {len(deleted_files)} fichiers supprimés") # From c27b404126baf79bdce5173d282643f1abaa961e Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 15 Dec 2025 18:08:45 +0100 Subject: [PATCH 6/8] =?UTF-8?q?Print=20la=20config=20en=20d=C3=A9but=20de?= =?UTF-8?q?=20run=20de=20decp=5Fprocessing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config.py | 38 ++++++++++++++---------------------- src/flows/decp_processing.py | 13 +++++++++--- src/tasks/utils.py | 7 +++++++ 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/src/config.py b/src/config.py index 87c3d43..93583ad 100644 --- a/src/config.py +++ b/src/config.py @@ -26,20 +26,15 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: return Path(os.getenv(env) or alternative_path) -print(""" -########## -# Config # -########## -""") - +ALL_CONFIG = {} # Nombre maximal de workers utilisables par Prefect. Défaut : 16 MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4)) -print(f"{'MAX_PREFECT_WORKERS':<40}", MAX_PREFECT_WORKERS) +ALL_CONFIG["MAX_PREFECT_WORKERS"] = MAX_PREFECT_WORKERS # Durée avant l'expiration du cache des ressources (en heure). Défaut : 168 (7 jours) CACHE_EXPIRATION_TIME_HOURS = int(os.getenv("CACHE_EXPIRATION_TIME_HOURS", 168)) -print(f"{'CACHE_EXPIRATION_TIME_HOURS':<40}", CACHE_EXPIRATION_TIME_HOURS) +ALL_CONFIG["CACHE_EXPIRATION_TIME_HOURS"] = CACHE_EXPIRATION_TIME_HOURS DATE_NOW = datetime.now().isoformat()[0:10] # YYYY-MM-DD @@ -47,7 +42,7 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: # Publication ou non des fichiers produits sur data.gouv.fr DECP_PROCESSING_PUBLISH = os.getenv("DECP_PROCESSING_PUBLISH", "").lower() == "true" -print(f"{'DECP_PROCESSING_PUBLISH':<40}", DECP_PROCESSING_PUBLISH) +ALL_CONFIG["DECP_PROCESSING_PUBLISH"] = DECP_PROCESSING_PUBLISH # Client HTTP @@ -77,22 +72,22 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path: # Dossier racine BASE_DIR = make_path_from_env("DECP_BASE_DIR", Path(__file__).absolute().parent.parent) -print(f"{'BASE_DIR':<40}", BASE_DIR) +ALL_CONFIG["BASE_DIR"] = BASE_DIR # Les variables configurées sur le serveur doivent avoir la priorité DATA_DIR = make_path_from_env("DECP_DATA_DIR", BASE_DIR / "data") DATA_DIR.mkdir(exist_ok=True, parents=True) -print(f"{'DATA_DIR':<40}", DATA_DIR) +ALL_CONFIG["DATA_DIR"] = DATA_DIR RESOURCE_CACHE_DIR = make_path_from_env( "RESOURCE_CACHE_DIR", DATA_DIR / "resource_cache" ) RESOURCE_CACHE_DIR.mkdir(exist_ok=True, parents=True) -print(f"{'RESOURCE_CACHE_DIR':<40}", RESOURCE_CACHE_DIR) +ALL_CONFIG["RESOURCE_CACHE_DIR"] = RESOURCE_CACHE_DIR DIST_DIR = make_path_from_env("DECP_DIST_DIR", BASE_DIR / "dist") DIST_DIR.mkdir(exist_ok=True, parents=True, mode=777) -print(f"{'DIST_DIR':<40}", DIST_DIR) +ALL_CONFIG["DIST_DIR"] = DIST_DIR def make_sirene_data_dir(sirene_data_parent_dir) -> Path: @@ -117,19 +112,19 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: SIRENE_DATA_DIR = Path(os.path.join(BASE_DIR, SIRENE_DATA_DIR)) # SIRENE_DATA_DIR on ne le crée que si nécessaire, dans flows.py -print(f"{'SIRENE_DATA_PARENT_DIR':<40}", SIRENE_DATA_PARENT_DIR) -print(f"{'SIRENE_DATA_DIR':<40}", SIRENE_DATA_DIR) +ALL_CONFIG["SIRENE_DATA_PARENT_DIR"] = SIRENE_DATA_PARENT_DIR +ALL_CONFIG["SIRENE_DATA_DIR"] = SIRENE_DATA_DIR SIRENE_UNITES_LEGALES_URL = os.getenv("SIRENE_UNITES_LEGALES_URL", "") # Mode de scraping SCRAPING_MODE = os.getenv("SCRAPING_MODE", "month") -print(f"{'SCRAPING_MODE':<40}", SCRAPING_MODE) +ALL_CONFIG["SCRAPING_MODE"] = SCRAPING_MODE # Target (plateforme cible pour le scraping) SCRAPING_TARGET = os.getenv("SCRAPING_TARGET") -print(f"{'SCRAPING_TARGET':<40}", SCRAPING_TARGET) +ALL_CONFIG["SCRAPING_TARGET"] = SCRAPING_TARGET # Lecture ou non des ressource en cache DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true" @@ -139,7 +134,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: # Données de référence REFERENCE_DIR = BASE_DIR / "reference" -print(f"{'REFERENCE_DIR':<40}", REFERENCE_DIR) +ALL_CONFIG["REFERENCE_DIR"] = REFERENCE_DIR # Liste et ordre des colonnes pour le mono dataframe de base (avant normalisation et spécialisation) # Sert aussi à vérifier qu'au moins ces colonnes sont présentes (d'autres peuvent être présentes en plus, les colonnes "innatendues") @@ -171,7 +166,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: # Liste des ID de ressources présentes dans un dataset à traiter, au format JSON ou XML, mais exclues du traitement EXCLUDED_RESOURCES = os.getenv("EXCLUDED_RESOURCES", "").replace(" ", "") -print(f"{'EXCLUDED_RESOURCES':<40}", EXCLUDED_RESOURCES) +ALL_CONFIG["EXCLUDED_RESOURCES"] = EXCLUDED_RESOURCES EXCLUDED_RESOURCES = EXCLUDED_RESOURCES.split(",") EXCLUDED_RESOURCES = ( @@ -193,7 +188,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path: # Ne traiter qu'un seul dataset identifier par son ID SOLO_DATASET = os.getenv("SOLO_DATASET", "") -print(f"{'SOLO_DATASET':<40}", SOLO_DATASET) +ALL_CONFIG["SOLO_DATASET"] = SOLO_DATASET with open( make_path_from_env( @@ -214,6 +209,3 @@ class DecpFormat: prefixe_json_marches: str liste_marches_ijson: sendable_list | None = None coroutine_ijson: Coroutine | None = None - - -print("") diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index 70d0c85..e020786 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -10,8 +10,8 @@ from prefect_email import EmailServerCredentials, email_send_message from src.config import ( + ALL_CONFIG, BASE_DF_COLUMNS, - BASE_DIR, DATE_NOW, DECP_PROCESSING_PUBLISH, DIST_DIR, @@ -33,7 +33,12 @@ concat_parquet_files, sort_columns, ) -from src.tasks.utils import full_resource_name, generate_stats, remove_unused_cache +from src.tasks.utils import ( + full_resource_name, + generate_stats, + print_all_config, + remove_unused_cache, +) @flow( @@ -41,7 +46,9 @@ task_runner=ConcurrentTaskRunner(max_workers=MAX_PREFECT_WORKERS), ) def decp_processing(enable_cache_removal: bool = True): - print(f"🚀 Début du flow decp-processing dans base dir {BASE_DIR} ") + print("🚀 Début du flow decp-processing") + + print_all_config(ALL_CONFIG) print("Liste de toutes les ressources des datasets...") resources: list[dict] = list_resources(TRACKED_DATASETS) diff --git a/src/tasks/utils.py b/src/tasks/utils.py index 03bec5c..7934894 100644 --- a/src/tasks/utils.py +++ b/src/tasks/utils.py @@ -333,3 +333,10 @@ def check_parquet_file(path) -> bool: return result except (FileNotFoundError, pl.exceptions.ComputeError): return False + + +def print_all_config(all_config): + msg = "" + for k, v in sorted(all_config.items()): + msg += f"\n{k}: {v}" + print(msg) From 85a6d2e6dd3db8de65b95fc07b919b0adb38d741 Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 15 Dec 2025 22:33:57 +0100 Subject: [PATCH 7/8] Utilisation du multithreading standard de python pour que get_clean ne soit plus une task --- src/flows/decp_processing.py | 23 ++++++++++++----------- src/tasks/get.py | 9 --------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/src/flows/decp_processing.py b/src/flows/decp_processing.py index e020786..b838bcc 100644 --- a/src/flows/decp_processing.py +++ b/src/flows/decp_processing.py @@ -1,12 +1,12 @@ import os import shutil +from concurrent.futures import ThreadPoolExecutor import polars as pl import polars.selectors as cs from prefect import flow, task from prefect.artifacts import create_table_artifact from prefect.context import get_run_context -from prefect.task_runners import ConcurrentTaskRunner from prefect_email import EmailServerCredentials, email_send_message from src.config import ( @@ -41,10 +41,7 @@ ) -@flow( - log_prints=True, - task_runner=ConcurrentTaskRunner(max_workers=MAX_PREFECT_WORKERS), -) +@flow(log_prints=True) def decp_processing(enable_cache_removal: bool = True): print("🚀 Début du flow decp-processing") @@ -153,16 +150,20 @@ def process_batch( f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}" ) futures = {} - for resource in batch: - future = get_clean.submit(resource, resources_artifact, available_parquet_files) - futures[future] = full_resource_name(resource) - for f in futures: + with ThreadPoolExecutor(max_workers=MAX_PREFECT_WORKERS) as executor: + for resource in batch: + future = executor.submit( + get_clean, resource, resources_artifact, available_parquet_files + ) + futures[future] = full_resource_name(resource) + + for future in futures: try: - result = f.result() + result = future.result() if result is not None: parquet_files.append(result) except Exception as e: - resource_name = futures[f] + resource_name = futures[future] print(f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):") print(e) # Nettoyage explicite diff --git a/src/tasks/get.py b/src/tasks/get.py index 26e2fe9..c472a7c 100644 --- a/src/tasks/get.py +++ b/src/tasks/get.py @@ -11,7 +11,6 @@ import polars as pl from httpx import Client, HTTPStatusError, TimeoutException, get from lxml import etree, html -from prefect import task from prefect.transactions import transaction from tenacity import ( retry, @@ -444,14 +443,6 @@ def get_insee_cog_data(url, schema_overrides, columns) -> pl.DataFrame: return df_insee -@task( - log_prints=True, - persist_result=False, - retries=3, - retry_delay_seconds=3, - # cache_expiration=datetime.timedelta(hours=CACHE_EXPIRATION_TIME_HOURS), - # cache_key_fn=get_clean_cache_key, -) def get_clean( resource, resources_artifact: list, available_parquet_files: set ) -> pl.DataFrame or None: From f72a297f1730ba35e995ee3819edcc08449e711b Mon Sep 17 00:00:00 2001 From: Colin Maudry Date: Mon, 15 Dec 2025 22:46:06 +0100 Subject: [PATCH 8/8] Changelog 2.6.2 --- CHANGELOG.md | 6 ++++++ README.md | 2 +- pyproject.toml | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 322bf53..473ec5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### 2.6.2 2025-12-15 + +- Réduction du nombre de tâches prefect pour réduire la charge sur la BDD et la latence +- Utilisation du multithreading standard de Python plutôt que celui de Prefect +- Le nom d'établissement n'est ajouté entre parenthèses que s'il est différent de celui de l'unité légale + ### 2.6.1 2025-12-14 - Séparation des fichiers de référence et des fichiers de données diff --git a/README.md b/README.md index b3ad214..9153400 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # DECP processing -> version 2.6.0 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) +> version 2.6.2 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md)) Projet de traitement et de publication de meilleures données sur les marchés publics attribués en France. Vous pouvez consulter, filtrer et télécharger ces données sur le site [decp.info](https://decp.info). Enfin la section [À propos](https://decp.info/a-propos) décrit les objectifs du projet et regroupe toutes les informations clés. diff --git a/pyproject.toml b/pyproject.toml index a90ca1c..ca90f06 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "decp-processing" description = "Traitement des données des marchés publics français." -version = "2.5.0" +version = "2.6.2" requires-python = ">= 3.9" authors = [ { name = "Colin Maudry", email = "colin+decp@maudry.com" }