Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 76 additions & 33 deletions src/tasks/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame:
# Codes CPV, suppression du caractères de contrôle ("-[0-9]$")
lf = lf.with_columns(pl.col("codeCPV").str.split("-").list[0].alias("codeCPV"))

# Nettoyage des codes département (ex: "006" -> "06")
lf = clean_lieu_execution_code(lf)

# Champs liste
lf = process_string_lists(lf)

Expand Down Expand Up @@ -172,6 +175,34 @@ def clean_invalid_characters(chunk: bytes):
return bytes(chunk, "utf-8")


def clean_lieu_execution_code(lf: pl.LazyFrame) -> pl.LazyFrame:
"""
Corrige les codes département mal formatés.

Certains codes département sont sur 3 chiffres avec un zéro devant (ex: "006" au lieu de "06").
Cette fonction supprime le zéro initial pour les codes métropolitains, tout en préservant
les codes DOM-TOM légitimes sur 3 chiffres (971, 972, 973, 974, 976).
"""
columns = lf.collect_schema().names()

if "lieuExecution_code" not in columns or "lieuExecution_typeCode" not in columns:
return lf

lf = lf.with_columns(
pl.when(
(pl.col("lieuExecution_typeCode") == "Code département")
& (pl.col("lieuExecution_code").str.len_chars() == 3)
& (pl.col("lieuExecution_code").str.starts_with("0"))
& (pl.col("lieuExecution_code").str.contains(r"^\d{3}$"))
)
.then(pl.col("lieuExecution_code").str.slice(1)) # Supprimer le premier caractère (le "0")
.otherwise(pl.col("lieuExecution_code"))
.alias("lieuExecution_code")
)

return lf


def clean_null_equivalent(lf: pl.LazyFrame) -> pl.LazyFrame:
"""Supprime les strings équivalente à null"""
mapping_null = {
Expand All @@ -198,61 +229,73 @@ def clean_null_equivalent(lf: pl.LazyFrame) -> pl.LazyFrame:
return lf


def clean_titulaires(lf: pl.LazyFrame, decp_format: DecpFormat, column) -> pl.LazyFrame:
def clean_titulaires(lf: pl.LazyFrame, decp_format: DecpFormat, column: str) -> pl.LazyFrame:
"""
Normalise les listes de titulaires en utilisant des expressions Polars natives.
`column` peut être titulaires ou modification_titulaires
Codée avec l'assistance du LLM Gemini 3 Pro et révisée par le développeur.

Note: Les données scrappées de marches-publics.info (AWS) ont un format hybride :
- Enveloppe JSON format 2022 (marches.marche)
- Titulaires format 2019 (pas de clé "titulaire" imbriquée)
On détecte donc le format des titulaires dynamiquement.
"""
# Définition des expressions de nettoyage selon le format
if decp_format.label == "DECP 2022":
# Skip processing if column dtype is Null (all values are null)
if lf.collect_schema()[column] == pl.Null:
return lf

# Détection du format des titulaires en regardant la structure du schéma
titulaires_schema = lf.collect_schema()[column]

# Vérifier si le schéma a une clé "titulaire" imbriquée
# Format 2022 : List(Struct({titulaire: Struct({id, typeIdentifiant})}))
# Format 2019 ou AWS hybride : List(Struct({id, typeIdentifiant, ...}))
has_nested_titulaire = False
if hasattr(titulaires_schema, "inner"):
inner_struct = titulaires_schema.inner
if hasattr(inner_struct, "fields"):
field_names = [f.name for f in inner_struct.fields]
has_nested_titulaire = "titulaire" in field_names

if has_nested_titulaire:
# Format 2022 : [{"titulaire": {"id": ..., "typeIdentifiant": ...}}]
# On veut extraire les champs de la struct imbriquée
expr_titulaire = pl.struct(
titulaire_id=pl.element().struct.field("titulaire").struct.field("id"),
titulaire_typeIdentifiant=pl.element()
.struct.field("titulaire")
.struct.field("typeIdentifiant"),
)

else:
# Format 2019 : [{"id": ..., "typeIdentifiant": ...}]
# On renomme juste les champs
# Format 2019 ou AWS hybride : [{"id": ..., "typeIdentifiant": ...}]
expr_titulaire = pl.struct(
titulaire_id=pl.element().struct.field("id"),
titulaire_typeIdentifiant=pl.element().struct.field("typeIdentifiant"),
)

# Skip processing if column dtype is Null (all values are null)
# This happens when modification_titulaires has no actual data
if lf.collect_schema()[column] != pl.Null:
lf = lf.with_columns(
pl.col(column)
.list.eval(expr_titulaire)
.list.eval(
# Filtrer les éléments où id ET typeIdentifiant sont null
pl.element().filter(
pl.element().struct.field("titulaire_id").is_not_null()
| pl.element()
.struct.field("titulaire_typeIdentifiant")
.is_not_null()
)
lf = lf.with_columns(
pl.col(column)
.list.eval(expr_titulaire)
.list.eval(
# Filtrer les éléments où id ET typeIdentifiant sont null
pl.element().filter(
pl.element().struct.field("titulaire_id").is_not_null()
| pl.element()
.struct.field("titulaire_typeIdentifiant")
.is_not_null()
)
.alias(column)
)
.alias(column)
)

# Remplacer les listes de titulaires vides par null
# Only process columns that have List dtype

if lf.collect_schema()[column] != pl.Null:
lf = lf.with_columns(
[
pl.when(pl.col(column).list.len() == 0)
.then(None)
.otherwise(pl.col(column))
.alias(column)
]
)
lf = lf.with_columns(
[
pl.when(pl.col(column).list.len() == 0)
.then(None)
.otherwise(pl.col(column))
.alias(column)
]
)

return lf

Expand Down
55 changes: 42 additions & 13 deletions src/tasks/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ def write_marche_rows(marche: dict, file, decp_format: DecpFormat) -> set[str]:
"""Ajout d'une ligne ndjson pour chaque modification/version du marché."""
fields = set()
if marche: # marche peut être null (marches-securises.fr)
for mod in yield_modifications(marche):
# Déterminer le format cible pour les titulaires
target_format = "2022" if decp_format.label == "DECP 2022" else "2019"

for mod in yield_modifications(marche, target_format=target_format):
if mod is None:
continue
# Pour decp-2019.json : désimbrication des données des titulaires
Expand All @@ -318,7 +321,9 @@ def write_marche_rows(marche: dict, file, decp_format: DecpFormat) -> set[str]:
return fields


def yield_modifications(row: dict, separator="_") -> Iterator[dict] or None:
def yield_modifications(
row: dict, separator="_", target_format: str = "2022"
) -> Iterator[dict] or None:
"""Pour chaque modification, génère un objet/dict marché aplati."""
raw_mods = row.pop("modifications", [])
# Couvre le format 2022:
Expand All @@ -331,12 +336,16 @@ def yield_modifications(row: dict, separator="_") -> Iterator[dict] or None:
elif isinstance(raw_mods, str) or raw_mods is None:
raw_mods = []

# Normaliser les titulaires du marché principal vers le format cible
if "titulaires" in row and row["titulaires"]:
row["titulaires"] = norm_titulaires(row["titulaires"], target_format)

mods = [{}] + raw_mods
for i, mod in enumerate(mods):
mod["id"] = i
if "modification" in mod:
mod = mod["modification"]
titulaires = norm_titulaires(mod)
titulaires = norm_titulaires(mod.get("titulaires"), target_format)
if titulaires is not None:
mod["titulaires"] = titulaires
row["modification"] = mod
Expand All @@ -345,32 +354,52 @@ def yield_modifications(row: dict, separator="_") -> Iterator[dict] or None:
)


def norm_titulaires(titulaires):
def norm_titulaires(titulaires, target_format: str = "2022"):
"""
Corrige les blocs titulaires imbriqués dans n niveaux de listes.
Corrige les blocs titulaires imbriqués dans n niveaux de listes et normalise
vers le format cible (2019 ou 2022).

:param titulaires:
:return: titulaires:
:param titulaires: liste de titulaires
:param target_format: "2019" ou "2022"
:return: titulaires normalisés
"""
if isinstance(titulaires, list):
titulaires_clean = []
for t in titulaires:
if isinstance(t, dict):
titulaires_clean.append(norm_titulaire(t))
titulaires_clean.append(norm_titulaire(t, target_format))
elif isinstance(t, list):
# Traite les listes de titulaires écrites en listes de listes.
for inner_t in t:
if isinstance(inner_t, dict):
titulaires_clean.append(norm_titulaire(inner_t))
titulaires_clean.append(norm_titulaire(inner_t, target_format))
return titulaires_clean
return None


def norm_titulaire(titulaire: dict, target_format: str = "2022"):
"""
Normalise un titulaire vers le format cible.

Format 2019 : {"id": ..., "typeIdentifiant": ...}
Format 2022 : {"titulaire": {"id": ..., "typeIdentifiant": ...}}

Note: Les données AWS (marches-publics.info) utilisent une enveloppe format 2022
mais avec des titulaires au format 2019. On normalise donc tout vers le format cible.
"""
if target_format == "2022":
if "titulaire" not in titulaire:
# Format 2019 ou AWS hybride - convertir en format 2022
return {"titulaire": titulaire}
return titulaire
else:
# Format 2019
if "titulaire" in titulaire:
return titulaire["titulaire"]
return titulaire


# Récupération des données des établissements
def norm_titulaire(titulaire: dict):
if "titulaire" in titulaire:
titulaire = titulaire["titulaire"]
return titulaire


def get_etablissements() -> pl.LazyFrame:
Expand Down