Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""s3_upload_delegation

Revision ID: ea6989325f62
Revises: 642ef45b49c6
Create Date: 2026-01-12 23:27:24.210332

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from alembic_postgresql_enum import TableReference
from sqlalchemy.dialects import postgresql

from sqlalchemy import Text
import app.db.types

# revision identifiers, used by Alembic.
revision: str = "ea6989325f62"
down_revision: Union[str, None] = "642ef45b49c6"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"asset", sa.Column("upload_meta", postgresql.JSONB(astext_type=sa.Text()), nullable=True)
)
op.sync_enum_values(
enum_schema="public",
enum_name="assetstatus",
new_values=["CREATED", "UPLOADING", "DELETED"],
affected_columns=[
TableReference(table_schema="public", table_name="asset", column_name="status")
],
enum_values_to_rename=[],
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema="public",
enum_name="assetstatus",
new_values=["CREATED", "DELETED"],
affected_columns=[
TableReference(table_schema="public", table_name="asset", column_name="status")
],
enum_values_to_rename=[],
)
op.drop_column("asset", "upload_meta")
# ### end Alembic commands ###
54 changes: 54 additions & 0 deletions alembic/versions/20260112_232729_4d9640ae6ba0_update_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Update triggers

Revision ID: 4d9640ae6ba0
Revises: ea6989325f62
Create Date: 2026-01-12 23:27:29.559400

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


from sqlalchemy import Text
import app.db.types

# revision identifiers, used by Alembic.
revision: str = "4d9640ae6ba0"
down_revision: Union[str, None] = "ea6989325f62"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(
"ix_asset_full_path",
"asset",
["full_path"],
unique=True,
postgresql_where=sa.text("status != 'DELETED'"),
)
op.create_index(
"uq_asset_entity_id_path",
"asset",
["path", "entity_id"],
unique=True,
postgresql_where=sa.text("status != 'DELETED'"),
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"uq_asset_entity_id_path",
table_name="asset",
postgresql_where=sa.text("status != 'DELETED'"),
)
op.drop_index(
"ix_asset_full_path", table_name="asset", postgresql_where=sa.text("status != 'DELETED'")
)
# ### end Alembic commands ###
7 changes: 7 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ class Settings(BaseSettings):
S3_MULTIPART_THRESHOLD: int = 5 * 1024**2 # bytes # TODO: decide an appropriate value
S3_PRESIGNED_URL_EXPIRATION: int = 600 # seconds # TODO: decide an appropriate value

S3_MULTIPART_UPLOAD_MAX_SIZE: int = 5 * 1024**3 # TODO: Set appropriate upper file size limit
S3_MULTIPART_UPLOAD_MIN_PART_SIZE: int = 5 * 1024**2
S3_MULTIPART_UPLOAD_MAX_PART_SIZE: int = 5 * 1024**3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems very high, no?

Copy link
Contributor Author

@eleftherioszisis eleftherioszisis Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, these parameters need some fine tuning. These are the s3 limits. We may want to stay way below that.

S3_MULTIPART_UPLOAD_MIN_PARTS: int = 1
S3_MULTIPART_UPLOAD_MAX_PARTS: int = 10_000
S3_MULTIPART_UPLOAD_DEFAULT_PARTS: int = 100

API_ASSET_POST_MAX_SIZE: int = 150 * 1024**2 # bytes # TODO: decide an appropriate value

DB_ENGINE: str = "postgresql+psycopg2"
Expand Down
64 changes: 49 additions & 15 deletions app/db/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from sqlalchemy.orm.session import object_session

from app.db.model import Asset
from app.db.types import AssetStatus
from app.logger import L
from app.utils.s3 import delete_asset_storage_object, get_s3_client
from app.utils.s3 import delete_asset_storage_object, get_s3_client, multipart_upload_abort

ASSETS_TO_DELETE_KEY = "assets_to_delete_from_storage"

Expand All @@ -22,22 +23,55 @@ def collect_asset_for_storage_deletion(_mapper, _connection, target: Asset):

@event.listens_for(Session, "after_commit")
def delete_assets_from_storage(session: Session):
"""Delete storage objects for assets removed in a committed transaction."""
"""Delete storage objects for assets removed in a committed transaction.

Note: Due to the nature of the operation that iterates over all assets it is important to not
throw an error even if one of the external side-effect fail. Otherwise, after the rollback
there might be db assets that are not deleted but their s3 files are.

Instead with capturing the errors it is ensured that db assets are always deleted even if that
may result in orphan files or multipart uploads that have failed to be deleted.

TODO: Add a cleanup function on a schedule that would remove s3 orphans from time to time.
"""
to_delete = session.info.pop(ASSETS_TO_DELETE_KEY, set())
for asset in to_delete:
try:
delete_asset_storage_object(
storage_type=asset.storage_type,
s3_key=asset.full_path,
storage_client_factory=get_s3_client,
)
except Exception: # noqa: BLE001
L.exception(
"Failed to delete storage object for Asset id={} full_path={} storage_type={}",
asset.id,
asset.full_path,
asset.storage_type,
)
match asset.status:
case AssetStatus.UPLOADING:
try:
multipart_upload_abort(
upload_id=asset.upload_meta["upload_id"],
storage_type=asset.storage_type,
s3_key=asset.full_path,
storage_client_factory=get_s3_client,
)
except Exception: # noqa: BLE001
L.exception(
(
"Failed to abort multipart upload for Asset "
"id={} full_path={} storage_type={}"
),
asset.id,
asset.full_path,
asset.storage_type,
)
case _:
try:
delete_asset_storage_object(
storage_type=asset.storage_type,
s3_key=asset.full_path,
storage_client_factory=get_s3_client,
)
except Exception: # noqa: BLE001
L.exception(
(
"Failed to delete storage object for Asset "
"id={} full_path={} storage_type={}"
),
asset.id,
asset.full_path,
asset.storage_type,
)


@event.listens_for(Session, "after_rollback")
Expand Down
2 changes: 2 additions & 0 deletions app/db/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,8 @@ class Asset(Identifiable):
)
storage_type: Mapped[StorageType]

upload_meta: Mapped[JSON_DICT | None]

# partial unique index
__table_args__ = (
Index(
Expand Down
1 change: 1 addition & 0 deletions app/db/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ class AnnotationBodyType(StrEnum):

class AssetStatus(StrEnum):
CREATED = auto()
UPLOADING = auto()
DELETED = auto()


Expand Down
2 changes: 2 additions & 0 deletions app/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class ApiErrorCode(UpperStrEnum):
ASSET_NOT_A_DIRECTORY = auto()
ASSET_INVALID_SCHEMA = auto()
ASSET_INVALID_CONTENT_TYPE = auto()
ASSET_UPLOAD_INCOMPLETE = auto()
ASSET_NOT_UPLOADING = auto()
ION_NAME_NOT_FOUND = auto()
S3_CANNOT_CREATE_PRESIGNED_URL = auto()
OPENAI_API_KEY_MISSING = auto()
Expand Down
6 changes: 4 additions & 2 deletions app/repository/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ def get_entity_asset(

return self.db.execute(query).scalar_one()

def create_entity_asset(self, entity_id: uuid.UUID, asset: AssetCreate) -> Asset:
def create_entity_asset(
self, entity_id: uuid.UUID, asset: AssetCreate, status: AssetStatus = AssetStatus.CREATED
) -> Asset:
"""Create an asset associated with the given entity."""
sha256_digest = bytes.fromhex(asset.sha256_digest) if asset.sha256_digest else None
db_asset = Asset(
status=AssetStatus.CREATED,
status=status,
entity_id=entity_id,
path=asset.path,
full_path=asset.full_path,
Expand Down
Loading