Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ repos:
(?x)
^airflow-ctl.*\.py$|
^airflow-core/src/airflow/models/.*\.py$|
^airflow-core/src/airflow/dag_processing/.*\.py$|
^airflow-core/src/airflow/migrations/.*\.py$|
^airflow-core/src/airflow/utils/.*\.py$|
^airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$|
^airflow-core/tests/unit/models/test_serialized_dag.py$|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]:
self.log.debug("Signed URL template for bundle %s", bundle_name)
return new_template_, new_params_

stored = {b.name: b for b in session.query(DagBundleModel).all()}
stored = {b.name: b for b in session.scalars(select(DagBundleModel)).all()}
bundle_to_team = {
bundle.name: bundle.teams[0].name if len(bundle.teams) == 1 else None
for bundle in stored.values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import sqlalchemy as sa
from alembic import context, op
from sqlalchemy import select
from sqlalchemy.orm import lazyload

from airflow.models.trigger import Trigger
Expand Down Expand Up @@ -60,7 +61,7 @@ def upgrade():
if not context.is_offline_mode():
session = get_session()
try:
for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)):
for trigger in session.scalars(select(Trigger).options(lazyload(Trigger.task_instance))):
trigger.kwargs = trigger.kwargs
session.commit()
finally:
Expand All @@ -81,7 +82,7 @@ def downgrade():
else:
session = get_session()
try:
for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)):
for trigger in session.scalars(select(Trigger).options(lazyload(Trigger.task_instance))):
trigger.encrypted_kwargs = json.dumps(BaseSerialization.serialize(trigger.kwargs))
session.commit()
finally:
Expand Down
27 changes: 14 additions & 13 deletions airflow-core/src/airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@

if TYPE_CHECKING:
from pendulum import DateTime
from sqlalchemy.orm import Query, Session
from sqlalchemy.orm import Session
from sqlalchemy.sql.selectable import Select

from airflow.models import Base

Expand Down Expand Up @@ -163,17 +164,17 @@ def readable_config(self):
config_dict: dict[str, _TableConfig] = {x.orm_model.name: x for x in sorted(config_list)}


def _check_for_rows(*, query: Query, print_rows: bool = False) -> int:
num_entities = query.count()
def _check_for_rows(*, query: Select, print_rows: bool = False, session: Session) -> int:
num_entities = session.execute(select(func.count()).select_from(query.subquery())).scalar()
print(f"Found {num_entities} rows meeting deletion criteria.")
if not print_rows or num_entities == 0:
return num_entities

max_rows_to_print = 100
print(f"Printing first {max_rows_to_print} rows.")
logger.debug("print entities query: %s", query)
for entry in query.limit(max_rows_to_print):
print(entry.__dict__)
for entry in session.execute(query.limit(max_rows_to_print)):
print(entry._asdict())
return num_entities


Expand All @@ -193,7 +194,7 @@ def _dump_table_to_file(*, target_table: str, file_path: str, export_format: str


def _do_delete(
*, query: Query, orm_model: Base, skip_archive: bool, session: Session, batch_size: int | None
*, query: Select, orm_model: Base, skip_archive: bool, session: Session, batch_size: int | None
) -> None:
import itertools
import re
Expand All @@ -204,7 +205,7 @@ def _do_delete(

while True:
limited_query = query.limit(batch_size) if batch_size else query
if limited_query.count() == 0: # nothing left to delete
if session.execute(select(func.count()).select_from(limited_query.subquery())).scalar() == 0:
break

batch_no = next(batch_counter)
Expand Down Expand Up @@ -233,7 +234,7 @@ def _do_delete(
logger.debug("insert statement:\n%s", insert_stm.compile())
session.execute(insert_stm)
else:
stmt = CreateTableAs(target_table_name, limited_query.selectable)
stmt = CreateTableAs(target_table_name, limited_query)
logger.debug("ctas query:\n%s", stmt.compile())
session.execute(stmt)
session.commit()
Expand Down Expand Up @@ -309,10 +310,10 @@ def _build_query(
clean_before_timestamp: DateTime,
session: Session,
**kwargs,
) -> Query:
) -> Select:
base_table_alias = "base"
base_table = aliased(orm_model, name=base_table_alias)
query = session.query(base_table).with_entities(text(f"{base_table_alias}.*"))
query = select(text(f"{base_table_alias}.*")).select_from(base_table)
base_table_recency_col = base_table.c[recency_column.name]
conditions = [base_table_recency_col < clean_before_timestamp]
if keep_last:
Expand All @@ -325,7 +326,7 @@ def _build_query(
max_date_colname=max_date_col_name,
session=session,
)
query = query.select_from(base_table).outerjoin(
query = query.outerjoin(
subquery,
and_(
*[base_table.c[x] == subquery.c[x] for x in keep_last_group_by], # type: ignore[attr-defined]
Expand Down Expand Up @@ -364,9 +365,9 @@ def _cleanup_table(
clean_before_timestamp=clean_before_timestamp,
session=session,
)
logger.debug("old rows query:\n%s", query.selectable.compile())
logger.debug("old rows query:\n%s", query.compile())
print(f"Checking table {orm_model.name}")
num_rows = _check_for_rows(query=query, print_rows=False)
num_rows = _check_for_rows(query=query, print_rows=False, session=session)

if num_rows and not dry_run:
_do_delete(
Expand Down
Loading