diff --git a/cwms/api.py b/cwms/api.py index 56cac1c..cdb605a 100644 --- a/cwms/api.py +++ b/cwms/api.py @@ -29,6 +29,7 @@ import base64 import json import logging +from http import HTTPStatus from json import JSONDecodeError from typing import Any, Optional, cast @@ -72,9 +73,9 @@ class InvalidVersion(Exception): class ApiError(Exception): """CWMS Data Api Error. - This class is a light wrapper around a `requests.Response` object. Its primary purpose - is to generate an error message that includes the request URL and provide additional - information to the user to help them resolve the error. + Light wrapper around a response-like object (e.g., requests.Response or a + test stub with url, status_code, reason, and content attributes). Produces + a concise, single-line error message with an optional hint. """ def __init__(self, response: Response): @@ -91,23 +92,37 @@ def __str__(self) -> str: message += "." # Add additional context to help the user resolve the issue. - if hint := self.hint(): + hint = self.hint() + if hint: message += f" {hint}" - if content := self.response.content: - message += f" {content.decode('utf8')}" + # Optional content (decoded if bytes) + content = getattr(self.response, "content", None) + if content: + if isinstance(content, bytes): + try: + text = content.decode("utf-8", errors="replace") + except Exception: + text = repr(content) + else: + text = str(content) + message += f" {text}" return message def hint(self) -> str: - """Return a message with additional information on how to resolve the error.""" + """Return a short hint based on HTTP status code.""" + status = getattr(self.response, "status_code", None) - if self.response.status_code == 400: + if status == 429: + return "Too many requests made." + if status == 400: return "Check that your parameters are correct." - elif self.response.status_code == 404: + if status == 404: return "May be the result of an empty query." - else: - return "" + + # No hint for other codes + return "" def init_session( @@ -227,6 +242,12 @@ def _process_response(response: Response) -> Any: return response.text if content_type.startswith("image/"): return base64.b64encode(response.content).decode("utf-8") + # Handle excel content types + if content_type in [ + "application/vnd.ms-excel", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ]: + return response.content # Fallback for remaining content types return response.content.decode("utf-8") except JSONDecodeError as error: diff --git a/pyproject.toml b/pyproject.toml index b34c316..bf9fbdc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "cwms-python" repository = "https://github.com/HydrologicEngineeringCenter/cwms-python" -version = "1.0.0" +version = "1.0.1" packages = [ diff --git a/tests/cda/blobs/blob_CDA_test.py b/tests/cda/blobs/blob_CDA_test.py index 62f8fa6..6e67606 100644 --- a/tests/cda/blobs/blob_CDA_test.py +++ b/tests/cda/blobs/blob_CDA_test.py @@ -59,10 +59,20 @@ def _find_blob_row(office: str, blob_id: str) -> Optional[pd.Series]: def test_store_blob_excel(): + # Create an empty file with the excel extension excel_file_path = Path(__file__).parent.parent / "resources" / "blob_test.xlsx" with open(excel_file_path, "rb") as f: file_data = f.read() - mime_type, _ = mimetypes.guess_type(excel_file_path) + + # Get the file extension and decide which type to use if xlsx or xlx + ext = excel_file_path.suffix.lower() + mime_type = mimetypes.guess_type(excel_file_path.name)[0] + # Some linux systems may not have the excel mimetypes registered + if ext == ".xlsx": + mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + elif ext == ".xls": + mime_type = "application/vnd.ms-excel" + excel_blob_id = "TEST_BLOB_EXCEL" payload = { "office-id": TEST_OFFICE, @@ -72,12 +82,16 @@ def test_store_blob_excel(): "value": base64.b64encode(file_data).decode("utf-8"), } blobs.store_blobs(data=payload) - try: - row = _find_blob_row(TEST_OFFICE, excel_blob_id) - assert row is not None, "Stored blob not found in listing" - finally: - # Cleanup excel - blobs.delete_blob(blob_id=excel_blob_id, office_id=TEST_OFFICE) + row = _find_blob_row(TEST_OFFICE, excel_blob_id) + assert row is not None, "Stored blob not found in listing" + + +def test_get_excel_blob(): + # Retrieve the excel blob stored in the previous test + excel_blob_id = "TEST_BLOB_EXCEL" + content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=excel_blob_id) + assert content is not None, "Failed to retrieve excel blob" + assert len(content) > 0, "Excel blob content is empty" def test_store_blob(): @@ -133,3 +147,16 @@ def test_update_blob(): # Verify new content content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_UPDATED_ID) assert TEST_TEXT_UPDATED in content + + +def test_delete_blobs(): + # Delete the test blob + blobs.delete_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID) + blobs.delete_blob(office_id=TEST_OFFICE, blob_id="TEST_BLOB_EXCEL") + + # Confirm deletion via listing + row = _find_blob_row(TEST_OFFICE, TEST_BLOB_ID) + assert row is None, "Blob still found after deletion" + + row = _find_blob_row(TEST_OFFICE, "TEST_BLOB_EXCEL") + assert row is None, "Excel blob still found after deletion"