Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions cwms/catalog/blobs.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import base64
from typing import Optional
from typing import Any, Optional

import cwms.api as api
from cwms.cwms_types import JSON, Data
from cwms.utils.checks import is_base64
from cwms.utils.checks import has_invalid_chars, is_base64

STORE_DICT = """data = {
"office-id": "SWT",
Expand All @@ -14,6 +14,8 @@
}
"""

IGNORED_ID = "ignored"


def get_blob(blob_id: str, office_id: str) -> str:
"""Get a single BLOB (Binary Large Object).
Expand All @@ -29,8 +31,13 @@ def get_blob(blob_id: str, office_id: str) -> str:
str: the value returned based on the content-type it was stored with as a string
"""

endpoint = f"blobs/{blob_id}"
params = {"office": office_id}
params: dict[str, Any] = {}
if has_invalid_chars(blob_id):
endpoint = f"blobs/{IGNORED_ID}"
params["blob-id"] = blob_id
else:
endpoint = f"blobs/{blob_id}"
params["office"] = office_id
response = api.get(endpoint, params, api_version=1)
return str(response)

Expand Down Expand Up @@ -107,8 +114,13 @@ def delete_blob(blob_id: str, office_id: str) -> None:
None
"""

endpoint = f"blobs/{blob_id}"
params = {"office": office_id}
params: dict[str, Any] = {}
if has_invalid_chars(blob_id):
endpoint = f"blobs/{IGNORED_ID}"
params["blob-id"] = blob_id
else:
endpoint = f"blobs/{blob_id}"
params["office"] = office_id
return api.delete(endpoint, params, api_version=1)


Expand Down Expand Up @@ -143,6 +155,11 @@ def update_blob(data: JSON, fail_if_not_exists: Optional[bool] = True) -> None:

blob_id = data.get("id", "").upper()

endpoint = f"blobs/{blob_id}"
params = {"fail-if-not-exists": fail_if_not_exists}
params: dict[str, Any] = {}
if has_invalid_chars(blob_id):
endpoint = f"blobs/{IGNORED_ID}"
params["blob-id"] = blob_id
else:
endpoint = f"blobs/{blob_id}"
params["fail-if-not-exists"] = fail_if_not_exists
return api.patch(endpoint, data, params, api_version=1)
67 changes: 44 additions & 23 deletions cwms/catalog/clobs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
from typing import Optional
from typing import Any, Optional

import cwms.api as api
from cwms.cwms_types import JSON, Data
from cwms.utils.checks import has_invalid_chars

STORE_DICT = """data = {
"office-id": "SWT",
"id": "CLOB_ID",
"description": "Your description here",
"value": "STRING of content"
}
"""

def get_clob(clob_id: str, office_id: str, clob_id_query: Optional[str] = None) -> Data:
IGNORED_ID = "ignored"


def get_clob(clob_id: str, office_id: str) -> Data:
"""Get a single clob.

Parameters
Expand All @@ -13,28 +24,20 @@ def get_clob(clob_id: str, office_id: str, clob_id_query: Optional[str] = None)
Specifies the id of the clob
office_id: string
Specifies the office of the clob.
clob_id_query: string
If this query parameter is provided the id path parameter is ignored and the
value of the query parameter is used. Note: this query parameter is necessary
for id's that contain '/' or other special characters. Because of abuse even
properly escaped '/' in url paths are blocked. When using this query parameter
a valid path parameter must still be provided for the request to be properly
routed. If your clob id contains '/' you can't specify the clob-id query
parameter and also specify the id path parameter because firewall and/or server
rules will deny the request even though you are specifying this override. "ignored"
is suggested.


Returns
-------
cwms data type. data.json will return the JSON output and data.df will return a dataframe
"""

endpoint = f"clobs/{clob_id}"
params = {
"office": office_id,
"clob-id-query": clob_id_query,
}
params: dict[str, Any] = {}
if has_invalid_chars(clob_id):
endpoint = f"clobs/{IGNORED_ID}"
params["clob-id"] = clob_id
else:
endpoint = f"clobs/{clob_id}"
params["office"] = office_id
response = api.get(endpoint, params)
return Data(response)

Expand Down Expand Up @@ -90,13 +93,20 @@ def delete_clob(clob_id: str, office_id: str) -> None:
None
"""

endpoint = f"clobs/{clob_id}"
params = {"office": office_id}
params: dict[str, Any] = {}
if has_invalid_chars(clob_id):
endpoint = f"clobs/{IGNORED_ID}"
params["clob-id"] = clob_id
else:
endpoint = f"clobs/{clob_id}"
params["office"] = office_id

return api.delete(endpoint, params=params, api_version=1)


def update_clob(data: JSON, clob_id: str, ignore_nulls: Optional[bool] = True) -> None:
def update_clob(
data: JSON, clob_id: Optional[str] = None, ignore_nulls: Optional[bool] = True
) -> None:
"""Updates clob

Parameters
Expand All @@ -110,7 +120,7 @@ def update_clob(data: JSON, clob_id: str, ignore_nulls: Optional[bool] = True) -
"value": "string"
}
clob_id: string
Specifies the id of the clob to be deleted
Specifies the id of the clob to be deleted. Unused if "id" is present in JSON data.
ignore_nulls: Boolean
If true, null and empty fields in the provided clob will be ignored and the existing value of those fields left in place. Default: true

Expand All @@ -122,8 +132,19 @@ def update_clob(data: JSON, clob_id: str, ignore_nulls: Optional[bool] = True) -
if not isinstance(data, dict):
raise ValueError("Cannot store a Clob without a JSON data dictionary")

endpoint = f"clobs/{clob_id}"
params = {"ignore-nulls": ignore_nulls}
if "id" in data:
clob_id = data.get("id", "").upper()

if clob_id is None:
raise ValueError(f"Cannot update a Clob without an 'id' field:\n{STORE_DICT}")

params: dict[str, Any] = {}
if has_invalid_chars(clob_id):
endpoint = f"clobs/{IGNORED_ID}"
params["clob-id"] = clob_id
else:
endpoint = f"clobs/{clob_id}"
params["ignore-nulls"] = ignore_nulls

return api.patch(endpoint, data, params, api_version=1)

Expand Down
12 changes: 12 additions & 0 deletions cwms/utils/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,15 @@ def is_base64(s: str) -> bool:
return base64.b64encode(decoded).decode("utf-8") == s
except (ValueError, TypeError):
return False


def has_invalid_chars(id: str) -> bool:
"""
Checks if ID contains any invalid web path characters.
"""
INVALID_PATH_CHARS = ["/", "\\", "&", "?", "="]

for char in INVALID_PATH_CHARS:
if char in id:
return True
return False
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ services:
condition: service_completed_successfully
traefik:
condition: service_healthy
image: ${CWMS_DATA_API_IMAGE:-ghcr.io/usace/cwms-data-api:latest-dev}
image: ${CWMS_DATA_API_IMAGE:-ghcr.io/usace/cwms-data-api:develop-nightly}
restart: unless-stopped
volumes:
- ./compose_files/pki/certs:/conf/
Expand Down
135 changes: 135 additions & 0 deletions tests/cda/blobs/blob_CDA_path_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# tests/test_blob.py
from __future__ import annotations

import base64
import mimetypes
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

import pandas as pd
import pytest

import cwms.catalog.blobs as blobs

TEST_OFFICE = "MVP"
TEST_BLOB_ID = "/PYTEST/BLOB/ALPHA"
TEST_BLOB_UPDATED_ID = TEST_BLOB_ID # keeping same id; update modifies fields
TEST_MEDIA_TYPE = "text/plain"
TEST_DESC = "pytest blob ? initial"
TEST_DESC_UPDATED = "pytest blob ? updated"
TEST_TEXT = "Hello from pytest @ " + datetime.now(timezone.utc).isoformat(
timespec="seconds"
)
TEST_TEXT_UPDATED = TEST_TEXT + " (edited)"


@pytest.fixture(scope="module", autouse=True)
def ensure_clean_slate():
"""Delete the test blob (if it exists) before/after running this module."""
try:
blobs.delete_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID)
except Exception:
pass
yield
try:
blobs.delete_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID)
except Exception:
pass


@pytest.fixture(autouse=True)
def init_session(request):
print("Initializing CWMS API session for blob tests...")


def _find_blob_row(office: str, blob_id: str) -> Optional[pd.Series]:
"""
Helper: return the row for blob_id from cwms.get_blobs(...).df if present.
"""
res = blobs.get_blobs(office_id=office, blob_id_like=blob_id)
df = res if isinstance(res, pd.DataFrame) else getattr(res, "df", None)
if df is None or df.empty:
return None
# normalize id column name if needed id or blob-id
if "id" not in df.columns and "blob-id" in df.columns:
df = df.rename(columns={"blob-id": "id"})
match = df[df["id"].str.upper() == blob_id.upper()]
return match.iloc[0] if not match.empty else None


def test_store_blob_excel():
excel_file_path = Path(__file__).parent.parent / "resources" / "blob_test.xlsx"
with open(excel_file_path, "rb") as f:
file_data = f.read()
mime_type, _ = mimetypes.guess_type(excel_file_path)
excel_blob_id = "/TEST/BLOB/EXCEL"
payload = {
"office-id": TEST_OFFICE,
"id": excel_blob_id,
"description": "testing excel file",
"media-type-id": mime_type,
"value": base64.b64encode(file_data).decode("utf-8"),
}
blobs.store_blobs(data=payload)
try:
row = _find_blob_row(TEST_OFFICE, excel_blob_id)
assert row is not None, "Stored blob not found in listing"
finally:
# Cleanup excel
blobs.delete_blob(blob_id=excel_blob_id, office_id=TEST_OFFICE)


def test_store_blob():
# Build request JSON for store_blobs
payload = {
"office-id": TEST_OFFICE,
"id": TEST_BLOB_ID,
"description": TEST_DESC,
"media-type-id": TEST_MEDIA_TYPE,
"value": TEST_TEXT,
}
blobs.store_blobs(payload, fail_if_exists=True)

# Verify via listing metadata
row = _find_blob_row(TEST_OFFICE, TEST_BLOB_ID)
assert row is not None, "Stored blob not found in listing"
assert str(row["id"]).upper() == TEST_BLOB_ID
if "media-type-id" in row.index:
assert row["media-type-id"] == TEST_MEDIA_TYPE
if "description" in row.index:
assert TEST_DESC in str(row["description"])

# Verify content by downloading
content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID)
assert isinstance(content, str) and content, "Empty blob content"
assert TEST_TEXT in content


def test_get_blob():
# Do a simple read of the blob created in test_store_blob
content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID)
assert TEST_TEXT in content
assert len(content) >= len(TEST_TEXT)


def test_update_blob():
# Test updating all fields
update = {
"office-id": TEST_OFFICE,
"id": TEST_BLOB_UPDATED_ID,
"description": TEST_DESC_UPDATED,
"media-type-id": TEST_MEDIA_TYPE,
"value": TEST_TEXT_UPDATED,
}
blobs.update_blob(update, fail_if_not_exists=True)

# Confirm updated metadata
row = _find_blob_row(TEST_OFFICE, TEST_BLOB_UPDATED_ID)
assert row is not None, "Updated blob not found"
if "description" in row.index:
assert TEST_DESC_UPDATED in str(row["description"])

# Verify new content
content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_UPDATED_ID)
assert TEST_TEXT_UPDATED in content
Loading
Loading