Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions igf_airflow/utils/dag49_cosmx_metadata_registration_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import logging
from datetime import timedelta
from airflow.models import Variable
from airflow.decorators import task
from airflow.operators.python import get_current_context
from igf_airflow.utils.generic_airflow_utils import (
send_airflow_failed_logs_to_channels
)
from igf_data.process.seqrun_processing.unified_metadata_registration import (
UnifiedMetadataRegistration
)

log = logging.getLogger(__name__)

MS_TEAMS_CONF = Variable.get(
'analysis_ms_teams_conf',
default_var=None
)
DATABASE_CONFIG_FILE = Variable.get(
'database_config_file',
default_var=None
)
IGF_PORTAL_CONF = Variable.get(
'igf_portal_conf',
default_var=None
)
FETCH_METADATA_URL_SUFFIX = Variable.get(
'cosmx_metadata_fetch_metadata_url',
default_var='/api/v1/raw_cosmx_metadata/get_raw_metadata/'
)
SYNC_METADATA_URL_SUFFIX = Variable.get(
'cosmx_metadata_sync_metadata_url',
default_var='/api/v1/raw_cosmx_metadata/mark_ready_metadata_as_synced'
)
METADATA_VALIDATION_SCHEMA = Variable.get(
'igf_portal_conf',
default_var=None
)
DEFAULT_EMAIL = Variable.get(
'default_email',
default_var=None
)

## TASK - find raw metadata id in datrun.conf
@task(
task_id="find_raw_metadata_id",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G'
)
def find_raw_metadata_id(
raw_cosmx_metadata_id_tag: str = "raw_cosmx_metadata_id",
dag_run_key: str = "dag_run"
) -> int:
try:
### dag_run.conf should have raw_cosmx_metadata_id
context = get_current_context()
dag_run = context.get(dag_run_key)
raw_cosmx_metadata_id = None
if (
dag_run is not None
and dag_run.conf is not None
and dag_run.conf.get(raw_cosmx_metadata_id_tag) is not None
):
raw_cosmx_metadata_id = (
dag_run.conf
.get(raw_cosmx_metadata_id_tag)
)
if raw_cosmx_metadata_id is None:
raise ValueError(
'raw_analysis_id not found in dag_run.conf'
)
return int(raw_cosmx_metadata_id)
except Exception as e:
message = f"Failed to get raw_analysis_id, error: {e}"
log.error(message)
send_airflow_failed_logs_to_channels(
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=str(message))
raise ValueError(message)

@task(
task_id="register_cosmx_metadata",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G'
)
def register_cosmx_metadata(
raw_cosmx_metadata_id: int
) -> None:
try:
metadata_registration = UnifiedMetadataRegistration(
raw_cosmx_metadata_id=raw_cosmx_metadata_id,
portal_config_file=IGF_PORTAL_CONF,
fetch_metadata_url_suffix=FETCH_METADATA_URL_SUFFIX,
sync_metadata_url_suffix=SYNC_METADATA_URL_SUFFIX,
metadata_validation_schema=METADATA_VALIDATION_SCHEMA,
db_config_file=DATABASE_CONFIG_FILE,
default_project_user_email=DEFAULT_EMAIL,
samples_required=False
)
metadata_registration.execute()
except Exception as e:
message = f"Failed to register new cosmx metadata, error: {e}"
log.error(message)
send_airflow_failed_logs_to_channels(
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=str(message)
)
raise ValueError(message)
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def _get_db_session_class(db_config_file: str) -> Any:
class MetadataContext:
def __init__(
self,
raw_cosmx_metadata_id: int,
portal_config_file: str,
fetch_metadata_url_suffix: str,
sync_metadata_url_suffix: str,
Expand All @@ -57,6 +58,7 @@ def __init__(
"sample": ["sample_igf_id", "project_igf_id"]},
error_list: List[str] = []) \
-> None:
self.raw_cosmx_metadata_id = raw_cosmx_metadata_id
self.portal_config_file = portal_config_file
self.fetch_metadata_url_suffix = fetch_metadata_url_suffix
self.sync_metadata_url_suffix = sync_metadata_url_suffix
Expand Down Expand Up @@ -90,9 +92,13 @@ def execute(self, metadata_context: MetadataContext) -> None:
try:
portal_config_file = metadata_context.portal_config_file
fetch_metadata_url_suffix = metadata_context.fetch_metadata_url_suffix
fetch_url = urljoin(
fetch_metadata_url_suffix,
str(metadata_context.raw_cosmx_metadata_id)
)
new_project_data_dict = \
get_data_from_portal(
url_suffix=fetch_metadata_url_suffix,
url_suffix=fetch_url,
portal_config_file=portal_config_file)
if len(new_project_data_dict) > 0:
reformatted_project_data_dict = \
Expand Down Expand Up @@ -619,6 +625,7 @@ def execute(self, metadata_context: MetadataContext) -> None:
class UnifiedMetadataRegistration:
def __init__(
self,
raw_cosmx_metadata_id: int,
portal_config_file: str,
fetch_metadata_url_suffix: str,
sync_metadata_url_suffix: str,
Expand All @@ -628,6 +635,7 @@ def __init__(
samples_required: bool = False,
) -> None:
self.metadata_context = MetadataContext(
raw_cosmx_metadata_id=raw_cosmx_metadata_id,
portal_config_file=portal_config_file,
fetch_metadata_url_suffix=fetch_metadata_url_suffix,
sync_metadata_url_suffix=sync_metadata_url_suffix,
Expand Down
6 changes: 5 additions & 1 deletion test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ def full_suite():
Test_dag46_scRNA_10X_flex_utilsA)
from .igf_airflow.test_dag44_analysis_registration_utils import (
Test_dag44_analysis_registration_utilsA)
from .igf_airflow.test_dag49_cosmx_metadata_registration_utils import (
Test_dag49_cosmx_metadata_registration_utilsA
)


return unittest.TestSuite([
Expand Down Expand Up @@ -372,6 +375,7 @@ def full_suite():
TestUnifiedMetadataRegistrationB,
Test_dag46_scRNA_10X_flex_utilsA,
Test_dag45_metadata_registration_utilsA,
Test_dag44_analysis_registration_utilsA
Test_dag44_analysis_registration_utilsA,
Test_dag49_cosmx_metadata_registration_utilsA
]
])
44 changes: 44 additions & 0 deletions test/igf_airflow/test_dag49_cosmx_metadata_registration_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import unittest
from igf_data.utils.fileutils import (
get_temp_dir,
remove_dir
)
from unittest.mock import patch, MagicMock
from igf_airflow.utils.dag49_cosmx_metadata_registration_utils import (
find_raw_metadata_id,
register_cosmx_metadata
)

class Test_dag49_cosmx_metadata_registration_utilsA(unittest.TestCase):
def setUp(self):
self.temp_dir = get_temp_dir()

def tearDown(self):
remove_dir(self.temp_dir)

@patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.get_current_context")
def test_find_raw_metadata_id(self, mock_get_context):
mock_context = MagicMock()
mock_context.dag_run.conf.raw_cosmx_metadata_id = 1
mock_context.get.return_value = mock_context.dag_run
mock_context.dag_run.conf.get.return_value = 1
mock_get_context.return_value = mock_context
raw_cosmx_metadata_id = find_raw_metadata_id.function()
assert raw_cosmx_metadata_id == 1

@patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.DATABASE_CONFIG_FILE", "test.conf")
@patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.IGF_PORTAL_CONF", "test.conf")
@patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.METADATA_VALIDATION_SCHEMA", "test.json")
@patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.DEFAULT_EMAIL", "c@c.org")
@patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.UnifiedMetadataRegistration")
def test_register_cosmx_metadata(self, mock_class, *args):
mock_instance = MagicMock()
mock_instance.execute.return_value = None
mock_class.return_value = mock_instance
register_cosmx_metadata.function(
raw_cosmx_metadata_id=1
)
mock_instance.execute.assert_called_once()

if __name__=='__main__':
unittest.main()
11 changes: 11 additions & 0 deletions test/process/unified_metadata_registration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

class TestUnifiedMetadataRegistrationA(unittest.TestCase):
def setUp(self):
self.raw_cosmx_metadata_id = 1
self.portal_config_file = "test_config.json"
self.fetch_metadata_url_suffix = "test_fetch_suffix"
self.sync_metadata_url_suffix = "test_sync_suffix"
Expand Down Expand Up @@ -79,6 +80,7 @@ def tearDown(self):

def test_MetadataContext(self):
metadata_context = MetadataContext(
raw_cosmx_metadata_id=self.raw_cosmx_metadata_id,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand All @@ -93,6 +95,7 @@ def test_FetchNewMetadataCommand1(self, *args):
fetch_command = FetchNewMetadataCommand()
self.assertIsInstance(fetch_command, FetchNewMetadataCommand)
metadata_context = MetadataContext(
raw_cosmx_metadata_id=self.raw_cosmx_metadata_id,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand All @@ -109,6 +112,7 @@ def test_FetchNewMetadataCommand2(self, *args):
fetch_command = FetchNewMetadataCommand()
self.assertIsInstance(fetch_command, FetchNewMetadataCommand)
metadata_context = MetadataContext(
raw_cosmx_metadata_id=self.raw_cosmx_metadata_id,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand Down Expand Up @@ -163,6 +167,7 @@ def test_CheckRawMetadataColumnsCommand_check_columns(self):

def test_CheckRawMetadataColumnsCommand_execute(self):
metadata_context = MetadataContext(
raw_cosmx_metadata_id=self.raw_cosmx_metadata_id,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand All @@ -187,6 +192,7 @@ def test_CheckRawMetadataColumnsCommand_execute(self):
self.assertEqual(metadata_context.checked_required_column_dict.get(2), False)
self.assertEqual(len(metadata_context.error_list), 1)
metadata_context = MetadataContext(
raw_cosmx_metadata_id=self.raw_cosmx_metadata_id,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand Down Expand Up @@ -245,6 +251,7 @@ def test_ValidateMetadataCommand_validate_metadata(self):
def test_ValidateMetadataCommand_execute(self):
validate = ValidateMetadataCommand()
metadata_context = MetadataContext(
raw_cosmx_metadata_id=self.raw_cosmx_metadata_id,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand Down Expand Up @@ -275,6 +282,7 @@ def test_ValidateMetadataCommand_execute(self):

def test_CheckAndRegisterMetadataCommand_split_metadata(self):
metadata_context = MetadataContext(
raw_cosmx_metadata_id=self.raw_cosmx_metadata_id,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand Down Expand Up @@ -489,6 +497,7 @@ def test_CheckAndRegisterMetadataCommand_execute(self):
check_and_register = \
CheckAndRegisterMetadataCommand()
metadata_context = MetadataContext(
raw_cosmx_metadata_id=self.raw_cosmx_metadata_id,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand Down Expand Up @@ -567,6 +576,7 @@ def test_CheckAndRegisterMetadataCommand_execute(self):
@patch("igf_data.process.seqrun_processing.unified_metadata_registration.get_data_from_portal", return_value=True)
def test_SyncMetadataCommand_execute(self, *args):
metadata_context = MetadataContext(
raw_cosmx_metadata_id=self.raw_cosmx_metadata_id,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand Down Expand Up @@ -623,6 +633,7 @@ def tearDown(self):
3: [{"project_igf_id": "IGFA003", "deliverable": "COSMX", "name": "User DQ", "email_id": "a-c.com", "username": "ddd"}]})
def test_UnifiedMetadataRegistration_execute(self, *args):
metadata_registration = UnifiedMetadataRegistration(
raw_cosmx_metadata_id=1,
portal_config_file=self.portal_config_file,
fetch_metadata_url_suffix=self.fetch_metadata_url_suffix,
sync_metadata_url_suffix=self.sync_metadata_url_suffix,
Expand Down