From 449affefe6b2e97d390aa3456a812cc812630084 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 7 Oct 2025 10:21:57 -0700 Subject: [PATCH 1/2] feat: add logic to fastapi update --- .../daac_archiver/daac_archiver_logic.py | 76 ++++++++++++++----- requirements.txt | 7 +- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py index fbb910d7..ab1a5c2f 100644 --- a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py +++ b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py @@ -9,6 +9,7 @@ from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers from mdps_ds_lib.lib.utils.json_validator import JsonValidator +from mdps_ds_lib.stac_fast_api_client.sfa_client_factory import SFAClientFactory from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex from mdps_ds_lib.lib.aws.aws_sns import AwsSns @@ -147,25 +148,46 @@ def send_to_daac(self, event: dict): self.send_to_daac_internal(uds_cnm_json) return - def receive_from_daac(self, event: dict): - LOGGER.debug(f'receive_from_daac#event: {event}') - sns_msg = AwsMessageTransformers().sqs_sns(event) - LOGGER.debug(f'sns_msg: {sns_msg}') - cnm_notification_msg = sns_msg + def update_stac(self, cnm_notification_msg): + update_type = os.getenv('ARCHIVAL_STATUS_MECHANISM', '') + if not any([k for k in ['UDS', 'FAST_STAC'] if k == update_type]): + raise ValueError(f"missing ARCHIVAL_STATUS_MECHANISM environment variable or value is not {['UDS', 'FAST_STAC']}") + if update_type == 'UDS': + return self.update_stac_uds(cnm_notification_msg) + return self.update_stac_fast_api(cnm_notification_msg) - cnm_msg_schema = requests.get('https://raw.githubusercontent.com/podaac/cloud-notification-message-schema/v1.6.1/cumulus_sns_schema.json') - cnm_msg_schema.raise_for_status() - cnm_msg_schema = json.loads(cnm_msg_schema.text) - result = JsonValidator(cnm_msg_schema).validate(cnm_notification_msg) - if result is not None: - raise ValueError(f'input cnm event has cnm_msg_schema validation errors: {result}') - if 'response' not in cnm_notification_msg: - raise ValueError(f'missing response in {cnm_notification_msg}') + def update_stac_fast_api(self, cnm_notification_msg): + sfa_client = SFAClientFactory().get_instance_from_env() + collection_id, granule_id = ':'.join(cnm_notification_msg['identifier'].split(':')[:-1]), cnm_notification_msg['identifier'] + # TODO assuming granule ID is URN:NASA:VENUE:TENANT:VENUE:COLLECTION_ID:COLLECTION_ID + existing_item = sfa_client.get_item(collection_id, granule_id) + # TODO handle error when no existing_item. Currently, it is requests.HTTPError with 404 + if cnm_notification_msg['response']['status'] == 'SUCCESS': + latest_daac_status = { + 'archive_status': 'cnm_r_success', + 'archive_error_message': '', + 'archive_error_code': '', + } + else: + latest_daac_status = { + 'archive_status': 'cnm_r_failed', + 'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in cnm_notification_msg['response'] else 'unknown', + 'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg['response'] else 'unknown', + } + latest_daac_status['event_time'] = TimeUtils.get_current_time() + existing_item['properties']['archival_statuses'] = existing_item['properties']['archival_statuses'] + [latest_daac_status] if 'archival_statuses' in existing_item['properties'] else [latest_daac_status] + updated_item = sfa_client.update_item(collection_id, granule_id, existing_item, update_whole=True) # TODO partial update via patch is not working at this moment. + return + + def update_stac_uds(self, cnm_notification_msg): granule_identifier = UdsCollections.decode_identifier(cnm_notification_msg['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this. try: - existing_granule_object = self.__granules_index.get_entry(granule_identifier.tenant, granule_identifier.venue, cnm_notification_msg['identifier']) + existing_granule_object = self.__granules_index.get_entry(granule_identifier.tenant, + granule_identifier.venue, + cnm_notification_msg['identifier']) except Exception as e: - LOGGER.exception(f"error while attempting to retrieve existing record: {cnm_notification_msg['identifier']}, not continuing") + LOGGER.exception( + f"error while attempting to retrieve existing record: {cnm_notification_msg['identifier']}, not continuing") return LOGGER.debug(f'existing_granule_object: {existing_granule_object}') if cnm_notification_msg['response']['status'] == 'SUCCESS': @@ -177,7 +199,27 @@ def receive_from_daac(self, event: dict): return self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, { 'archive_status': 'cnm_r_failed', - 'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in cnm_notification_msg['response'] else 'unknown', - 'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg['response'] else 'unknown', + 'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in + cnm_notification_msg[ + 'response'] else 'unknown', + 'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg[ + 'response'] else 'unknown', }, cnm_notification_msg['identifier']) return + + def receive_from_daac(self, event: dict): + LOGGER.debug(f'receive_from_daac#event: {event}') + sns_msg = AwsMessageTransformers().sqs_sns(event) + LOGGER.debug(f'sns_msg: {sns_msg}') + cnm_notification_msg = sns_msg + + cnm_msg_schema = requests.get('https://raw.githubusercontent.com/podaac/cloud-notification-message-schema/v1.6.1/cumulus_sns_schema.json') + cnm_msg_schema.raise_for_status() + cnm_msg_schema = json.loads(cnm_msg_schema.text) + result = JsonValidator(cnm_msg_schema).validate(cnm_notification_msg) + if result is not None: + raise ValueError(f'input cnm event has cnm_msg_schema validation errors: {result}') + if 'response' not in cnm_notification_msg: + raise ValueError(f'missing response in {cnm_notification_msg}') + self.update_stac(cnm_notification_msg) + return diff --git a/requirements.txt b/requirements.txt index 16bde7d1..870508d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,6 @@ certifi==2024.8.30 charset-normalizer==3.3.2 click==8.1.7 dateparser==1.2.0 -elasticsearch==7.13.4 exceptiongroup==1.2.2 fastapi==0.115.0 fastjsonschema==2.20.0 @@ -15,7 +14,7 @@ jsonschema==4.23.0 jsonschema-specifications==2023.12.1 lark==0.12.0 mangum==0.18.0 -mdps-ds-lib==1.1.1.dev800 +mdps-ds-lib==1.2.0.dev100 pydantic==2.9.2 pydantic_core==2.23.4 pygeofilter==0.2.4 @@ -26,7 +25,7 @@ python-dotenv==1.0.1 pytz==2024.2 referencing==0.35.1 regex==2024.9.11 -requests==2.31.0 +requests==2.32.5 requests-aws4auth==1.2.3 rpds-py==0.20.0 six==1.16.0 @@ -35,6 +34,6 @@ starlette==0.38.6 tenacity==8.2.3 typing_extensions==4.12.2 tzlocal==5.2 -urllib3==1.26.11 +urllib3==1.26.20 uvicorn==0.30.6 xmltodict==0.13.0 From f095c5a95ce28e10a0fb70c8c7f2c5e9c90b4d40 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 7 Oct 2025 10:43:15 -0700 Subject: [PATCH 2/2] fix: update terraform --- tf-module/unity-cumulus/daac_archiver.tf | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tf-module/unity-cumulus/daac_archiver.tf b/tf-module/unity-cumulus/daac_archiver.tf index dc2931e1..3d805d9a 100644 --- a/tf-module/unity-cumulus/daac_archiver.tf +++ b/tf-module/unity-cumulus/daac_archiver.tf @@ -44,6 +44,13 @@ resource "aws_lambda_function" "daac_archiver_response" { LOG_LEVEL = var.log_level ES_URL = aws_elasticsearch_domain.uds-es.endpoint ES_PORT = 443 + ARCHIVAL_STATUS_MECHANISM = "UDS" # UDS or FAST_STAC + DS_URL = 'TODO' + SFA_USERNAME = 'TODO' + SFA_PASSWORD = 'TODO' + SFA_AUTH_KEY = 'TODO' + SFA_AUTH_VALUE = 'TODO' + SFA_BEARER_TOKEN = 'TODO' } }