Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### 2.6.3 2025-12-16

- Téléchargement des ressources plus résilient aux erreurs ([tenacity](https://tenacity.readthedocs.io/en/latest/))
- Téléchargement des données établissements plus résilient aux erreurs ([tenacity](https://tenacity.readthedocs.io/en/latest/))

### 2.6.2 2025-12-15

- Réduction du nombre de tâches prefect pour réduire la charge sur la BDD et la latence
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# DECP processing

> version 2.6.2 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md))
> version 2.6.3 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md))

Projet de traitement et de publication de meilleures données sur les marchés publics attribués en France. Vous pouvez consulter, filtrer et télécharger
ces données sur le site [decp.info](https://decp.info). Enfin la section [À propos](https://decp.info/a-propos) décrit les objectifs du projet et regroupe toutes les informations clés.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "decp-processing"
description = "Traitement des données des marchés publics français."
version = "2.6.2"
version = "2.6.3"
requires-python = ">= 3.9"
authors = [
{ name = "Colin Maudry", email = "colin+decp@maudry.com" }
Expand Down
File renamed without changes.
30 changes: 11 additions & 19 deletions src/tasks/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
import ijson
import orjson
import polars as pl
from httpx import Client, HTTPStatusError, TimeoutException, get
from httpx import Client, get
from lxml import etree, html
from prefect.transactions import transaction
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
Expand Down Expand Up @@ -44,16 +43,12 @@
)


@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(httpx.HTTPError), # On ne retry que sur erreur http
)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=20))
def stream_get(url: str, chunk_size=1024**2): # chunk_size en octets (1 Mo par défaut)
if url.startswith("http"):
try:
with HTTP_CLIENT.stream(
"GET", url, headers=HTTP_HEADERS, follow_redirects=True
"GET", url, headers=HTTP_HEADERS, follow_redirects=True, timeout=20
) as response:
yield from response.iter_bytes(chunk_size)
except httpx.TooManyRedirects:
Expand Down Expand Up @@ -397,22 +392,19 @@ def get_etablissements() -> pl.LazyFrame:
hrefs.append(base_url + href)

# Fonction de traitement pour un fichier
@retry(
stop=stop_after_attempt(4), wait=wait_exponential(multiplier=1, min=1, max=20)
)
def get_process_file(_href: str):
print(_href.split("/")[-1])
try:
response = http_client.get(
_href, headers=HTTP_HEADERS, timeout=20
).raise_for_status()
except (HTTPStatusError, TimeoutException) as err:
print(err)
print("Nouvel essai...")
response = http_client.get(
_href, headers=HTTP_HEADERS, timeout=20
).raise_for_status()
response = http_client.get(
_href, headers=HTTP_HEADERS, timeout=30
).raise_for_status()

content = response.content
lff = pl.scan_csv(content, schema_overrides=schema)
lff = lff.select(columns)
print(_href.split("/")[-1], "OK")

return lff

# Traitement en parrallèle avec 8 threads
Expand Down