Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 2.6.2 2025-12-15

- Réduction du nombre de tâches prefect pour réduire la charge sur la BDD et la latence
- Utilisation du multithreading standard de Python plutôt que celui de Prefect
- Le nom d'établissement n'est ajouté entre parenthèses que s'il est différent de celui de l'unité légale

### 2.6.1 2025-12-14

- Séparation des fichiers de référence et des fichiers de données
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# DECP processing

> version 2.6.0 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md))
> version 2.6.2 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md))

Projet de traitement et de publication de meilleures données sur les marchés publics attribués en France. Vous pouvez consulter, filtrer et télécharger
ces données sur le site [decp.info](https://decp.info). Enfin la section [À propos](https://decp.info/a-propos) décrit les objectifs du projet et regroupe toutes les informations clés.
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "decp-processing"
description = "Traitement des données des marchés publics français."
version = "2.5.0"
version = "2.6.2"
requires-python = ">= 3.9"
authors = [
{ name = "Colin Maudry", email = "colin+decp@maudry.com" }
Expand All @@ -22,7 +22,8 @@ dependencies = [
"bs4",
"selenium",
"polars_ds",
"scikit-learn"
"scikit-learn",
"tenacity"
]

[project.optional-dependencies]
Expand Down
38 changes: 16 additions & 22 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,30 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path:
return Path(os.getenv(env) or alternative_path)


print("""
##########
# Config #
##########
""")

ALL_CONFIG = {}

# Nombre maximal de workers utilisables par Prefect. Défaut : 16
MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4))
print(f"{'MAX_PREFECT_WORKERS':<40}", MAX_PREFECT_WORKERS)
ALL_CONFIG["MAX_PREFECT_WORKERS"] = MAX_PREFECT_WORKERS

# Durée avant l'expiration du cache des ressources (en heure). Défaut : 168 (7 jours)
CACHE_EXPIRATION_TIME_HOURS = int(os.getenv("CACHE_EXPIRATION_TIME_HOURS", 168))
print(f"{'CACHE_EXPIRATION_TIME_HOURS':<40}", CACHE_EXPIRATION_TIME_HOURS)
ALL_CONFIG["CACHE_EXPIRATION_TIME_HOURS"] = CACHE_EXPIRATION_TIME_HOURS


DATE_NOW = datetime.now().isoformat()[0:10] # YYYY-MM-DD
MONTH_NOW = DATE_NOW[:7] # YYYY-MM

# Publication ou non des fichiers produits sur data.gouv.fr
DECP_PROCESSING_PUBLISH = os.getenv("DECP_PROCESSING_PUBLISH", "").lower() == "true"
print(f"{'DECP_PROCESSING_PUBLISH':<40}", DECP_PROCESSING_PUBLISH)
ALL_CONFIG["DECP_PROCESSING_PUBLISH"] = DECP_PROCESSING_PUBLISH


# Client HTTP
HTTP_CLIENT = httpx.Client()
HTTP_HEADERS = {
"Connection": "keep-alive",
"User-agent": "Projet : https://decp.info/a-propos | Client HTTP : https://pypi.org/project/httpx/",
"User-agent": "decp.info",
}

# Timeout pour la publication de chaque ressource sur data.gouv.fr
Expand All @@ -77,21 +72,22 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path:

# Dossier racine
BASE_DIR = make_path_from_env("DECP_BASE_DIR", Path(__file__).absolute().parent.parent)
print(f"{'BASE_DIR':<40}", BASE_DIR)
ALL_CONFIG["BASE_DIR"] = BASE_DIR

# Les variables configurées sur le serveur doivent avoir la priorité
DATA_DIR = make_path_from_env("DECP_DATA_DIR", BASE_DIR / "data")
DATA_DIR.mkdir(exist_ok=True, parents=True)
print(f"{'DATA_DIR':<40}", DATA_DIR)
ALL_CONFIG["DATA_DIR"] = DATA_DIR

RESOURCE_CACHE_DIR = make_path_from_env(
"RESOURCE_CACHE_DIR", DATA_DIR / "resource_cache"
)
RESOURCE_CACHE_DIR.mkdir(exist_ok=True, parents=True)
ALL_CONFIG["RESOURCE_CACHE_DIR"] = RESOURCE_CACHE_DIR

DIST_DIR = make_path_from_env("DECP_DIST_DIR", BASE_DIR / "dist")
DIST_DIR.mkdir(exist_ok=True, parents=True, mode=777)
print(f"{'DIST_DIR':<40}", DIST_DIR)
ALL_CONFIG["DIST_DIR"] = DIST_DIR


def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
Expand All @@ -116,19 +112,19 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
SIRENE_DATA_DIR = Path(os.path.join(BASE_DIR, SIRENE_DATA_DIR))

# SIRENE_DATA_DIR on ne le crée que si nécessaire, dans flows.py
print(f"{'SIRENE_DATA_PARENT_DIR':<40}", SIRENE_DATA_PARENT_DIR)
print(f"{'SIRENE_DATA_DIR':<40}", SIRENE_DATA_DIR)
ALL_CONFIG["SIRENE_DATA_PARENT_DIR"] = SIRENE_DATA_PARENT_DIR
ALL_CONFIG["SIRENE_DATA_DIR"] = SIRENE_DATA_DIR


SIRENE_UNITES_LEGALES_URL = os.getenv("SIRENE_UNITES_LEGALES_URL", "")

# Mode de scraping
SCRAPING_MODE = os.getenv("SCRAPING_MODE", "month")
print(f"{'SCRAPING_MODE':<40}", SCRAPING_MODE)
ALL_CONFIG["SCRAPING_MODE"] = SCRAPING_MODE

# Target (plateforme cible pour le scraping)
SCRAPING_TARGET = os.getenv("SCRAPING_TARGET")
print(f"{'SCRAPING_TARGET':<40}", SCRAPING_TARGET)
ALL_CONFIG["SCRAPING_TARGET"] = SCRAPING_TARGET

# Lecture ou non des ressource en cache
DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true"
Expand All @@ -138,6 +134,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:

# Données de référence
REFERENCE_DIR = BASE_DIR / "reference"
ALL_CONFIG["REFERENCE_DIR"] = REFERENCE_DIR

# Liste et ordre des colonnes pour le mono dataframe de base (avant normalisation et spécialisation)
# Sert aussi à vérifier qu'au moins ces colonnes sont présentes (d'autres peuvent être présentes en plus, les colonnes "innatendues")
Expand Down Expand Up @@ -169,7 +166,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:

# Liste des ID de ressources présentes dans un dataset à traiter, au format JSON ou XML, mais exclues du traitement
EXCLUDED_RESOURCES = os.getenv("EXCLUDED_RESOURCES", "").replace(" ", "")
print(f"{'EXCLUDED_RESOURCES':<40}", EXCLUDED_RESOURCES)
ALL_CONFIG["EXCLUDED_RESOURCES"] = EXCLUDED_RESOURCES

EXCLUDED_RESOURCES = EXCLUDED_RESOURCES.split(",")
EXCLUDED_RESOURCES = (
Expand All @@ -191,7 +188,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:

# Ne traiter qu'un seul dataset identifier par son ID
SOLO_DATASET = os.getenv("SOLO_DATASET", "")
print(f"{'SOLO_DATASET':<40}", SOLO_DATASET)
ALL_CONFIG["SOLO_DATASET"] = SOLO_DATASET

with open(
make_path_from_env(
Expand All @@ -212,6 +209,3 @@ class DecpFormat:
prefixe_json_marches: str
liste_marches_ijson: sendable_list | None = None
coroutine_ijson: Coroutine | None = None


print("")
96 changes: 58 additions & 38 deletions src/flows/decp_processing.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import os
import shutil
from concurrent.futures import ThreadPoolExecutor

import polars as pl
import polars.selectors as cs
from prefect import flow
from prefect import flow, task
from prefect.artifacts import create_table_artifact
from prefect.context import get_run_context
from prefect.task_runners import ConcurrentTaskRunner
from prefect_email import EmailServerCredentials, email_send_message

from src.config import (
ALL_CONFIG,
BASE_DF_COLUMNS,
BASE_DIR,
DATE_NOW,
DECP_PROCESSING_PUBLISH,
DIST_DIR,
Expand All @@ -33,15 +33,19 @@
concat_parquet_files,
sort_columns,
)
from src.tasks.utils import full_resource_name, generate_stats, remove_unused_cache
from src.tasks.utils import (
full_resource_name,
generate_stats,
print_all_config,
remove_unused_cache,
)


@flow(
log_prints=True,
task_runner=ConcurrentTaskRunner(max_workers=MAX_PREFECT_WORKERS),
)
@flow(log_prints=True)
def decp_processing(enable_cache_removal: bool = True):
print(f"🚀 Début du flow decp-processing dans base dir {BASE_DIR} ")
print("🚀 Début du flow decp-processing")

print_all_config(ALL_CONFIG)

print("Liste de toutes les ressources des datasets...")
resources: list[dict] = list_resources(TRACKED_DATASETS)
Expand All @@ -53,42 +57,24 @@ def decp_processing(enable_cache_removal: bool = True):
available_parquet_files = set(os.listdir(RESOURCE_CACHE_DIR))

# Traitement parallèle des ressources par lots pour éviter la surcharge mémoire
batch_size = 500
batch_size = 100
parquet_files = []

# Filtrer les ressources à traiter
# Filtrer les ressources à traiter, en ne gardant que les fichiers > 100 octets
resources_to_process = [r for r in resources if r["filesize"] > 100]

# Afin d'être sûr que je ne publie pas par erreur un jeu de données de test
decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000

for i in range(0, len(resources_to_process), batch_size):
batch = resources_to_process[i : i + batch_size]
print(
f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}"
process_batch(
available_parquet_files,
batch_size,
i,
parquet_files,
resources_artifact,
resources_to_process,
)

futures = {}
for resource in batch:
future = get_clean.submit(
resource, resources_artifact, available_parquet_files
)
futures[future] = full_resource_name(resource)

for f in futures:
try:
result = f.result()
if result is not None:
parquet_files.append(result)
except Exception as e:
resource_name = futures[f]
print(
f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):"
)
print(e)

# Nettoyage explicite
futures.clear()
# Afin d'être sûr que je ne publie pas par erreur un jeu de données de test
decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000

if decp_publish:
create_table_artifact(
Expand Down Expand Up @@ -150,6 +136,40 @@ def decp_processing(enable_cache_removal: bool = True):
print("☑️ Fin du flow principal decp_processing.")


@task(retries=2)
def process_batch(
available_parquet_files,
batch_size,
i,
parquet_files,
resources_artifact,
resources_to_process,
):
batch = resources_to_process[i : i + batch_size]
print(
f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}"
)
futures = {}
with ThreadPoolExecutor(max_workers=MAX_PREFECT_WORKERS) as executor:
for resource in batch:
future = executor.submit(
get_clean, resource, resources_artifact, available_parquet_files
)
futures[future] = full_resource_name(resource)

for future in futures:
try:
result = future.result()
if result is not None:
parquet_files.append(result)
except Exception as e:
resource_name = futures[future]
print(f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):")
print(e)
# Nettoyage explicite
futures.clear()


@sirene_preprocess.on_failure
@decp_processing.on_failure
def notify_exception_by_email(flow, flow_run, state):
Expand Down
7 changes: 4 additions & 3 deletions src/tasks/enrich.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import polars as pl
import polars.selectors as cs
from polars_ds import haversine
from prefect import task

from src.config import SIRENE_DATA_DIR
from src.tasks.transform import (
Expand Down Expand Up @@ -29,7 +28,10 @@ def add_etablissement_data(
# Si il y a un etablissement_nom (Enseigne1Etablissement ou denominationUsuelleEtablissement),
# on l'ajoute au nom de l'organisme, entre parenthèses
lf_sirets = lf_sirets.with_columns(
pl.when(pl.col("etablissement_nom").is_not_null())
pl.when(
pl.col("etablissement_nom").is_not_null()
& (pl.col("etablissement_nom") != pl.col(f"{type_siret}_nom"))
)
.then(
pl.concat_str(
pl.col(f"{type_siret}_nom"),
Expand Down Expand Up @@ -76,7 +78,6 @@ def add_unite_legale_data(
return lf_sirets


@task(log_prints=True)
def enrich_from_sirene(lf: pl.LazyFrame):
# Récupération des données SIRET/SIREN préparées dans sirene-preprocess()
lf_etablissements = pl.scan_parquet(SIRENE_DATA_DIR / "etablissements.parquet")
Expand Down
Loading