From f6fbf245ee1283fe54b82aa3f95a625fb7083955 Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Mon, 21 Apr 2025 16:52:23 -0400 Subject: [PATCH 1/8] Added normalization logging. This is to track how quickly NodeNorm normalizes identifiers, and also which identifiers are being normalized. Also removed an unnecessary catch-everything block. As per https://github.com/TranslatorSRI/NodeNormalization/issues/277#issuecomment-2819401676 --- node_normalizer/normalizer.py | 198 +++++++++++++++++----------------- node_normalizer/server.py | 2 +- 2 files changed, 102 insertions(+), 98 deletions(-) diff --git a/node_normalizer/normalizer.py b/node_normalizer/normalizer.py index fa1ed61..333d16f 100644 --- a/node_normalizer/normalizer.py +++ b/node_normalizer/normalizer.py @@ -3,6 +3,8 @@ from pathlib import Path import json as builtin_json +import time + import orjson as json import logging import os @@ -545,6 +547,10 @@ async def get_normalized_nodes( """ Get value(s) for key(s) using redis MGET """ + + # Time how long this query takes. + start_time = time.time_ns() + # malkovich malkovich curies = [ curie.__root__ if isinstance(curie, CURIE) else curie @@ -557,105 +563,103 @@ async def get_normalized_nodes( # conflation_redis = 5 upper_curies = [c.upper() for c in curies] - try: - canonical_ids = await app.state.eq_id_to_id_db.mget(*upper_curies, encoding='utf-8') - canonical_nonan = [canonical_id for canonical_id in canonical_ids if canonical_id is not None] - info_contents = {} - - # did we get some canonical ids - if canonical_nonan: - # get the information content values - info_contents = await get_info_content(app, canonical_nonan) - - # Get the equivalent_ids and types - eqids, types = await get_eqids_and_types(app, canonical_nonan) - - # are we looking for conflated values - if conflate_gene_protein or conflate_chemical_drug: - other_ids = [] - - if conflate_gene_protein: - other_ids.extend(await app.state.gene_protein_db.mget(*canonical_nonan, encoding='utf8')) - - # logger.error(f"After conflate_gene_protein: {other_ids}") - - if conflate_chemical_drug: - other_ids.extend(await app.state.chemical_drug_db.mget(*canonical_nonan, encoding='utf8')) - - # logger.error(f"After conflate_chemical_drug: {other_ids}") - - # if there are other ids, then we want to rebuild eqids and types. That's because even though we have them, - # they're not necessarily first. For instance if what came in and got canonicalized was a protein id - # and we want gene first, then we're relying on the order of the other_ids to put it back in the right place. - other_ids = [json.loads(oids) if oids else [] for oids in other_ids] - - # Until we added conflate_chemical_drug, canonical_nonan and other_ids would always have the same - # length, so we could figure out mappings from one to the other just by doing: - # dereference_others = dict(zip(canonical_nonan, other_ids)) - # Now that we have (potentially multiple) results to associate with each identifier, we need - # something a bit more sophisticated. - # - We use a defaultdict with set so that we can deduplicate identifiers here. - # - We use itertools.cycle() because len(canonical_nonan) will be <= len(other_ids), but we can be sure - # that each conflation method will return a list of identifiers (e.g. if gene_conflation returns nothing - # for two queries, other_ids = [[], [], ...]. By cycling through canonical_nonan, we can assign each - # result to the correct query for each conflation method. - dereference_others = collections.defaultdict(list) - for canon, oids in zip(itertools.cycle(canonical_nonan), other_ids): - dereference_others[canon].extend(oids) - - all_other_ids = sum(other_ids, []) - eqids2, types2 = await get_eqids_and_types(app, all_other_ids) - - # logger.error(f"other_ids = {other_ids}") - # logger.error(f"dereference_others = {dereference_others}") - # logger.error(f"all_other_ids = {all_other_ids}") - - final_eqids = [] - final_types = [] - - deref_others_eqs = dict(zip(all_other_ids, eqids2)) - deref_others_typ = dict(zip(all_other_ids, types2)) - - zipped = zip(canonical_nonan, eqids, types) - - for canonical_id, e, t in zipped: - # here's where we replace the eqids, types - if len(dereference_others[canonical_id]) > 0: - e = [] - t = [] - - for other in dereference_others[canonical_id]: - # logging.debug(f"e = {e}, other = {other}, deref_others_eqs = {deref_others_eqs}") - e += deref_others_eqs[other] - t += deref_others_typ[other] - - final_eqids.append(e) - final_types.append(uniquify_list(t)) - - dereference_ids = dict(zip(canonical_nonan, final_eqids)) - dereference_types = dict(zip(canonical_nonan, final_types)) - else: - dereference_ids = dict(zip(canonical_nonan, eqids)) - dereference_types = dict(zip(canonical_nonan, types)) + canonical_ids = await app.state.eq_id_to_id_db.mget(*upper_curies, encoding='utf-8') + canonical_nonan = [canonical_id for canonical_id in canonical_ids if canonical_id is not None] + info_contents = {} + + # did we get some canonical ids + if canonical_nonan: + # get the information content values + info_contents = await get_info_content(app, canonical_nonan) + + # Get the equivalent_ids and types + eqids, types = await get_eqids_and_types(app, canonical_nonan) + + # are we looking for conflated values + if conflate_gene_protein or conflate_chemical_drug: + other_ids = [] + + if conflate_gene_protein: + other_ids.extend(await app.state.gene_protein_db.mget(*canonical_nonan, encoding='utf8')) + + # logger.error(f"After conflate_gene_protein: {other_ids}") + + if conflate_chemical_drug: + other_ids.extend(await app.state.chemical_drug_db.mget(*canonical_nonan, encoding='utf8')) + + # logger.error(f"After conflate_chemical_drug: {other_ids}") + + # if there are other ids, then we want to rebuild eqids and types. That's because even though we have them, + # they're not necessarily first. For instance if what came in and got canonicalized was a protein id + # and we want gene first, then we're relying on the order of the other_ids to put it back in the right place. + other_ids = [json.loads(oids) if oids else [] for oids in other_ids] + + # Until we added conflate_chemical_drug, canonical_nonan and other_ids would always have the same + # length, so we could figure out mappings from one to the other just by doing: + # dereference_others = dict(zip(canonical_nonan, other_ids)) + # Now that we have (potentially multiple) results to associate with each identifier, we need + # something a bit more sophisticated. + # - We use a defaultdict with set so that we can deduplicate identifiers here. + # - We use itertools.cycle() because len(canonical_nonan) will be <= len(other_ids), but we can be sure + # that each conflation method will return a list of identifiers (e.g. if gene_conflation returns nothing + # for two queries, other_ids = [[], [], ...]. By cycling through canonical_nonan, we can assign each + # result to the correct query for each conflation method. + dereference_others = collections.defaultdict(list) + for canon, oids in zip(itertools.cycle(canonical_nonan), other_ids): + dereference_others[canon].extend(oids) + + all_other_ids = sum(other_ids, []) + eqids2, types2 = await get_eqids_and_types(app, all_other_ids) + + # logger.error(f"other_ids = {other_ids}") + # logger.error(f"dereference_others = {dereference_others}") + # logger.error(f"all_other_ids = {all_other_ids}") + + final_eqids = [] + final_types = [] + + deref_others_eqs = dict(zip(all_other_ids, eqids2)) + deref_others_typ = dict(zip(all_other_ids, types2)) + + zipped = zip(canonical_nonan, eqids, types) + + for canonical_id, e, t in zipped: + # here's where we replace the eqids, types + if len(dereference_others[canonical_id]) > 0: + e = [] + t = [] + + for other in dereference_others[canonical_id]: + # logging.debug(f"e = {e}, other = {other}, deref_others_eqs = {deref_others_eqs}") + e += deref_others_eqs[other] + t += deref_others_typ[other] + + final_eqids.append(e) + final_types.append(uniquify_list(t)) + + dereference_ids = dict(zip(canonical_nonan, final_eqids)) + dereference_types = dict(zip(canonical_nonan, final_types)) else: - dereference_ids = dict() - dereference_types = dict() - - # output the final result - normal_nodes = { - input_curie: await create_node(app, canonical_id, dereference_ids, dereference_types, info_contents, - include_descriptions=include_descriptions, - include_individual_types=include_individual_types, - conflations={ - 'GeneProtein': conflate_gene_protein, - 'DrugChemical': conflate_chemical_drug, - }) - for input_curie, canonical_id in zip(curies, canonical_ids) - } + dereference_ids = dict(zip(canonical_nonan, eqids)) + dereference_types = dict(zip(canonical_nonan, types)) + else: + dereference_ids = dict() + dereference_types = dict() + + # output the final result + normal_nodes = { + input_curie: await create_node(app, canonical_id, dereference_ids, dereference_types, info_contents, + include_descriptions=include_descriptions, + include_individual_types=include_individual_types, + conflations={ + 'GeneProtein': conflate_gene_protein, + 'DrugChemical': conflate_chemical_drug, + }) + for input_curie, canonical_id in zip(curies, canonical_ids) + } - except Exception as e: - exception_str = "".join(traceback.format_exc()) - logger.error(f'Exception: {exception_str}') + end_time = time.time_ns() + logger.info(f"Normalized {len(curies)} nodes in {(end_time - start_time)/1_000_000:.2f} ms: {sorted(curies)}") return normal_nodes diff --git a/node_normalizer/server.py b/node_normalizer/server.py index 3b8862c..f124eb2 100644 --- a/node_normalizer/server.py +++ b/node_normalizer/server.py @@ -281,7 +281,7 @@ async def get_normalized_node_handler( summary="Get the equivalent identifiers and semantic types for the curie(s) entered.", description="Returns the equivalent identifiers and semantic types for the curie(s). Use the `conflate` flag to choose whether to apply conflation.", ) -async def get_normalized_node_handler(curies: CurieList): +async def get_normalized_node_handler_post(curies: CurieList): """ Get value(s) for key(s) using redis MGET """ From f4e3ffd354e57e24c1ab38b8e8c37a12be55070c Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Mon, 21 Apr 2025 16:56:10 -0400 Subject: [PATCH 2/8] Removed catch-all blocks from loaders. --- node_normalizer/load_compendia.py | 246 ++++++++++++++--------------- node_normalizer/load_conflation.py | 46 +++--- 2 files changed, 140 insertions(+), 152 deletions(-) diff --git a/node_normalizer/load_compendia.py b/node_normalizer/load_compendia.py index 7c6187f..4c5692d 100644 --- a/node_normalizer/load_compendia.py +++ b/node_normalizer/load_compendia.py @@ -84,48 +84,44 @@ async def load(compendia_files, block_size, dry_run) -> bool: # 5-X: conflation databases consisting of canonical_id -> (list of conflated canonical_ids) # Each of these databases corresponds to a particular conflation e.g. gene/protein or chemical/drug - try: - # get the list of files in the directory - types_prefixes_redis: redis_adapter.RedisConnection = await get_redis("curie_to_bl_type_db") - # for each file validate and process - - # check the validity of the files - for comp in compendia_files: - if not validate_compendia(comp): - logger.warning(f"Compendia file {comp} is invalid.") - return False - - for comp in compendia_files: - if not validate_compendia(comp): - logger.warning(f"Compendia file {comp} is invalid.") - return False - - - for comp in compendia_files: - # check the validity of the file - - if not validate_compendia(comp): - logger.warning(f"Compendia file {comp} is invalid.") - continue - - # try to load the file - loaded = await load_compendium(comp, block_size, dry_run) - semantic_types_redis_pipeline = types_prefixes_redis.pipeline() - # @TODO add meta data about files eg. checksum to this object - # semantic_types_redis_pipeline.set(f"file-{str(comp)}", json.dumps({"source_prefixes": self.source_prefixes})) - if dry_run: - response = await redis_adapter.RedisConnection.execute_pipeline(semantic_types_redis_pipeline) - if asyncio.coroutines.iscoroutine(response): - await response - # self.source_prefixes = {} - if not loaded: - logger.warning(f"Compendia file {comp} did not load.") - continue - # merge all semantic counts from other files / loaders - await merge_semantic_meta_data() - except Exception as e: - logger.error(f"Exception thrown in load(): {e}") - raise e + # get the list of files in the directory + types_prefixes_redis: redis_adapter.RedisConnection = await get_redis("curie_to_bl_type_db") + # for each file validate and process + + # check the validity of the files + for comp in compendia_files: + if not validate_compendia(comp): + logger.warning(f"Compendia file {comp} is invalid.") + return False + + for comp in compendia_files: + if not validate_compendia(comp): + logger.warning(f"Compendia file {comp} is invalid.") + return False + + + for comp in compendia_files: + # check the validity of the file + + if not validate_compendia(comp): + logger.warning(f"Compendia file {comp} is invalid.") + continue + + # try to load the file + loaded = await load_compendium(comp, block_size, dry_run) + semantic_types_redis_pipeline = types_prefixes_redis.pipeline() + # @TODO add meta data about files eg. checksum to this object + # semantic_types_redis_pipeline.set(f"file-{str(comp)}", json.dumps({"source_prefixes": self.source_prefixes})) + if dry_run: + response = await redis_adapter.RedisConnection.execute_pipeline(semantic_types_redis_pipeline) + if asyncio.coroutines.iscoroutine(response): + await response + # self.source_prefixes = {} + if not loaded: + logger.warning(f"Compendia file {comp} did not load.") + continue + # merge all semantic counts from other files / loaders + await merge_semantic_meta_data() # return to the caller return True @@ -237,97 +233,93 @@ def get_ancestors(input_type): # init a line counter line_counter: int = 0 - try: - term2id_redis: redis_adapter.RedisConnection = await get_redis("eq_id_to_id_db") - id2eqids_redis: redis_adapter.RedisConnection = await get_redis("id_to_eqids_db") - id2type_redis: redis_adapter.RedisConnection = await get_redis("id_to_type_db") - info_content_redis: redis_adapter.RedisConnection = await get_redis("info_content_db") - - term2id_pipeline = term2id_redis.pipeline() - id2eqids_pipeline = id2eqids_redis.pipeline() - id2type_pipeline = id2type_redis.pipeline() - info_content_pipeline = info_content_redis.pipeline() - - with open(compendium_filename, "r", encoding="utf-8") as compendium: - logger.info(f"Processing {compendium_filename}...") - - # for each line in the file - for line in compendium: - line_counter = line_counter + 1 - - # load the line into memory - instance: dict = json.loads(line) - - # save the identifier - # "The" identifier is the first one in the presorted identifiers list - identifier: str = instance["identifiers"][0]["i"] - - # We want to accumulate statistics for each implied type as well, though we are only keeping the - # leaf type in the file (and redis). so now is the time to expand. We'll regenerate the same - # list on output. - semantic_types = get_ancestors(instance["type"]) - - # for each semantic type in the list - for semantic_type in semantic_types: - # save the semantic type in a set to avoid duplicates - semantic_types.add(semantic_type) - - # create a source prefix if it has not been encountered - if source_prefixes.get(semantic_type) is None: - source_prefixes[semantic_type] = {} - - # go through each equivalent identifier in the data row - # each will be assigned the semantic type information - for equivalent_id in instance["identifiers"]: - # split the identifier to just get the data source out of the curie - source_prefix: str = equivalent_id["i"].split(":")[0] - - # save the source prefix if no already there - if source_prefixes[semantic_type].get(source_prefix) is None: - source_prefixes[semantic_type][source_prefix] = 1 - # else just increment the count for the semantic type/source - else: - source_prefixes[semantic_type][source_prefix] += 1 - - # equivalent_id might be an array, where the first element is - # the identifier, or it might just be a string. not worrying about that case yet. - equivalent_id = equivalent_id["i"] - term2id_pipeline.set(equivalent_id.upper(), identifier) - # term2id_pipeline.set(equivalent_id, identifier) - - id2eqids_pipeline.set(identifier, json.dumps(instance["identifiers"])) - id2type_pipeline.set(identifier, instance["type"]) - - # if there is information content add it to the cache - if "ic" in instance and instance["ic"] is not None: - info_content_pipeline.set(identifier, instance["ic"]) - - if not dry_run and line_counter % block_size == 0: - await redis_adapter.RedisConnection.execute_pipeline(term2id_pipeline) - await redis_adapter.RedisConnection.execute_pipeline(id2eqids_pipeline) - await redis_adapter.RedisConnection.execute_pipeline(id2type_pipeline) - await redis_adapter.RedisConnection.execute_pipeline(info_content_pipeline) - - # Pipeline executed create a new one error - term2id_pipeline = term2id_redis.pipeline() - id2eqids_pipeline = id2eqids_redis.pipeline() - id2type_pipeline = id2type_redis.pipeline() - info_content_pipeline = info_content_redis.pipeline() - - logger.info(f"{line_counter} {compendium_filename} lines processed") - - if not dry_run: + term2id_redis: redis_adapter.RedisConnection = await get_redis("eq_id_to_id_db") + id2eqids_redis: redis_adapter.RedisConnection = await get_redis("id_to_eqids_db") + id2type_redis: redis_adapter.RedisConnection = await get_redis("id_to_type_db") + info_content_redis: redis_adapter.RedisConnection = await get_redis("info_content_db") + + term2id_pipeline = term2id_redis.pipeline() + id2eqids_pipeline = id2eqids_redis.pipeline() + id2type_pipeline = id2type_redis.pipeline() + info_content_pipeline = info_content_redis.pipeline() + + with open(compendium_filename, "r", encoding="utf-8") as compendium: + logger.info(f"Processing {compendium_filename}...") + + # for each line in the file + for line in compendium: + line_counter = line_counter + 1 + + # load the line into memory + instance: dict = json.loads(line) + + # save the identifier + # "The" identifier is the first one in the presorted identifiers list + identifier: str = instance["identifiers"][0]["i"] + + # We want to accumulate statistics for each implied type as well, though we are only keeping the + # leaf type in the file (and redis). so now is the time to expand. We'll regenerate the same + # list on output. + semantic_types = get_ancestors(instance["type"]) + + # for each semantic type in the list + for semantic_type in semantic_types: + # save the semantic type in a set to avoid duplicates + semantic_types.add(semantic_type) + + # create a source prefix if it has not been encountered + if source_prefixes.get(semantic_type) is None: + source_prefixes[semantic_type] = {} + + # go through each equivalent identifier in the data row + # each will be assigned the semantic type information + for equivalent_id in instance["identifiers"]: + # split the identifier to just get the data source out of the curie + source_prefix: str = equivalent_id["i"].split(":")[0] + + # save the source prefix if no already there + if source_prefixes[semantic_type].get(source_prefix) is None: + source_prefixes[semantic_type][source_prefix] = 1 + # else just increment the count for the semantic type/source + else: + source_prefixes[semantic_type][source_prefix] += 1 + + # equivalent_id might be an array, where the first element is + # the identifier, or it might just be a string. not worrying about that case yet. + equivalent_id = equivalent_id["i"] + term2id_pipeline.set(equivalent_id.upper(), identifier) + # term2id_pipeline.set(equivalent_id, identifier) + + id2eqids_pipeline.set(identifier, json.dumps(instance["identifiers"])) + id2type_pipeline.set(identifier, instance["type"]) + + # if there is information content add it to the cache + if "ic" in instance and instance["ic"] is not None: + info_content_pipeline.set(identifier, instance["ic"]) + + if not dry_run and line_counter % block_size == 0: await redis_adapter.RedisConnection.execute_pipeline(term2id_pipeline) await redis_adapter.RedisConnection.execute_pipeline(id2eqids_pipeline) await redis_adapter.RedisConnection.execute_pipeline(id2type_pipeline) await redis_adapter.RedisConnection.execute_pipeline(info_content_pipeline) - logger.info(f"{line_counter} {compendium_filename} total lines processed") + # Pipeline executed create a new one error + term2id_pipeline = term2id_redis.pipeline() + id2eqids_pipeline = id2eqids_redis.pipeline() + id2type_pipeline = id2type_redis.pipeline() + info_content_pipeline = info_content_redis.pipeline() + + logger.info(f"{line_counter} {compendium_filename} lines processed") + + if not dry_run: + await redis_adapter.RedisConnection.execute_pipeline(term2id_pipeline) + await redis_adapter.RedisConnection.execute_pipeline(id2eqids_pipeline) + await redis_adapter.RedisConnection.execute_pipeline(id2type_pipeline) + await redis_adapter.RedisConnection.execute_pipeline(info_content_pipeline) + + logger.info(f"{line_counter} {compendium_filename} total lines processed") - print(f"Done loading {compendium_filename}...") - except Exception as e: - logger.error(f"Exception thrown in load_compendium({compendium_filename}), line {line_counter}: {e}") - return False + print(f"Done loading {compendium_filename}...") # return to the caller return True diff --git a/node_normalizer/load_conflation.py b/node_normalizer/load_conflation.py index 795c6fd..e0d8b67 100644 --- a/node_normalizer/load_conflation.py +++ b/node_normalizer/load_conflation.py @@ -146,38 +146,34 @@ async def load_conflation(self, conflation: dict, block_size: int) -> bool: conflation_redis_connection_name = conflation["redis_db"] # init a line counter line_counter: int = 0 - try: - conflation_redis: RedisConnection = await self.get_redis(conflation_redis_connection_name) - conflation_pipeline = conflation_redis.pipeline() + conflation_redis: RedisConnection = await self.get_redis(conflation_redis_connection_name) + conflation_pipeline = conflation_redis.pipeline() - with open(f"{self._conflation_directory}/{conflation_file}", "r", encoding="utf-8") as cfile: - logger.info(f"Processing {conflation_file}...") + with open(f"{self._conflation_directory}/{conflation_file}", "r", encoding="utf-8") as cfile: + logger.info(f"Processing {conflation_file}...") - # for each line in the file - for line in cfile: - line_counter = line_counter + 1 + # for each line in the file + for line in cfile: + line_counter = line_counter + 1 - # load the line into memory - instance: dict = json.loads(line) - - for identifier in instance: - # We need to include the identifier in the list of identifiers so that we know its position - conflation_pipeline.set(identifier, line) + # load the line into memory + instance: dict = json.loads(line) - if self._test_mode != 1 and line_counter % block_size == 0: - await RedisConnection.execute_pipeline(conflation_pipeline) - # Pipeline executed create a new one error - conflation_pipeline = conflation_redis.pipeline() - logger.info(f"{line_counter} {conflation_file} lines processed") + for identifier in instance: + # We need to include the identifier in the list of identifiers so that we know its position + conflation_pipeline.set(identifier, line) - if self._test_mode != 1: + if self._test_mode != 1 and line_counter % block_size == 0: await RedisConnection.execute_pipeline(conflation_pipeline) - logger.info(f"{line_counter} {conflation_file} total lines processed") + # Pipeline executed create a new one error + conflation_pipeline = conflation_redis.pipeline() + logger.info(f"{line_counter} {conflation_file} lines processed") + + if self._test_mode != 1: + await RedisConnection.execute_pipeline(conflation_pipeline) + logger.info(f"{line_counter} {conflation_file} total lines processed") - print(f"Done loading {conflation_file}...") - except Exception as e: - logger.error(f"Exception thrown in load_conflation({conflation_file}), line {line_counter}: {e}") - return False + print(f"Done loading {conflation_file}...") # return to the caller return True From 6a30ea6b8c8789a6542f6a3063d3cc7bd3e8b6e7 Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Mon, 21 Apr 2025 16:57:59 -0400 Subject: [PATCH 3/8] Removed catch-all from loader.py. --- node_normalizer/loader.py | 284 ++++++++++++++++++-------------------- 1 file changed, 136 insertions(+), 148 deletions(-) diff --git a/node_normalizer/loader.py b/node_normalizer/loader.py index 43d48c8..6c4e7a0 100644 --- a/node_normalizer/loader.py +++ b/node_normalizer/loader.py @@ -243,45 +243,41 @@ async def load(self, block_size) -> bool: if self._test_mode == 1: logger.debug(f"Test mode enabled. No data will be produced.") - try: - # get the list of files in the directory - compendia: list = self.get_compendia() - types_prefixes_redis: RedisConnection = await self.get_redis("curie_to_bl_type_db") - # did we get all the files - if len(compendia) == len(self._data_files): - # for each file validate and process - for comp in compendia: - # check the validity of the file - if self.validate_compendia(comp): - # try to load the file - loaded = await self.load_compendium(comp, block_size) - semantic_types_redis_pipeline = types_prefixes_redis.pipeline() - # @TODO add meta data about files eg. checksum to this object - semantic_types_redis_pipeline.set(f"file-{str(comp)}", json.dumps({"source_prefixes": self.source_prefixes})) - if self._test_mode != 1: - response = await RedisConnection.execute_pipeline(semantic_types_redis_pipeline) - if asyncio.coroutines.iscoroutine(response): - await response - self.source_prefixes = {} - if not loaded: - logger.warning(f"Compendia file {comp} did not load.") - continue - else: - logger.warning(f"Compendia file {comp} is invalid.") - continue - for conf in self._conflations: - loaded = await self.load_conflation(conf, block_size) + # get the list of files in the directory + compendia: list = self.get_compendia() + types_prefixes_redis: RedisConnection = await self.get_redis("curie_to_bl_type_db") + # did we get all the files + if len(compendia) == len(self._data_files): + # for each file validate and process + for comp in compendia: + # check the validity of the file + if self.validate_compendia(comp): + # try to load the file + loaded = await self.load_compendium(comp, block_size) + semantic_types_redis_pipeline = types_prefixes_redis.pipeline() + # @TODO add meta data about files eg. checksum to this object + semantic_types_redis_pipeline.set(f"file-{str(comp)}", json.dumps({"source_prefixes": self.source_prefixes})) + if self._test_mode != 1: + response = await RedisConnection.execute_pipeline(semantic_types_redis_pipeline) + if asyncio.coroutines.iscoroutine(response): + await response + self.source_prefixes = {} if not loaded: - logger.warning(f"Conflation file {conf} did not load.") + logger.warning(f"Compendia file {comp} did not load.") continue - # merge all semantic counts from other files / loaders - await self.merge_semantic_meta_data() - else: - logger.error(f"Error: 1 or more data files were incorrect") - ret_val = False - except Exception as e: - logger.error(f"Exception thrown in load(): {''.join(traceback.format_exc())}") - raise e + else: + logger.warning(f"Compendia file {comp} is invalid.") + continue + for conf in self._conflations: + loaded = await self.load_conflation(conf, block_size) + if not loaded: + logger.warning(f"Conflation file {conf} did not load.") + continue + # merge all semantic counts from other files / loaders + await self.merge_semantic_meta_data() + else: + logger.error(f"Error: 1 or more data files were incorrect") + ret_val = False # return to the caller return ret_val @@ -390,42 +386,38 @@ async def load_conflation(self, conflation: dict, block_size: int) -> bool: conflation_redis_connection_name = conflation["redis_db"] # init a line counter line_counter: int = 0 - try: - conflation_redis: RedisConnection = await self.get_redis(conflation_redis_connection_name) - conflation_pipeline = conflation_redis.pipeline() + conflation_redis: RedisConnection = await self.get_redis(conflation_redis_connection_name) + conflation_pipeline = conflation_redis.pipeline() - with open(f"{self._conflation_directory}/{conflation_file}", "r", encoding="utf-8") as cfile: - logger.info(f"Processing {conflation_file}...") + with open(f"{self._conflation_directory}/{conflation_file}", "r", encoding="utf-8") as cfile: + logger.info(f"Processing {conflation_file}...") - # for each line in the file - for line in cfile: - line_counter = line_counter + 1 + # for each line in the file + for line in cfile: + line_counter = line_counter + 1 - # load the line into memory - instance: dict = json.loads(line) + # load the line into memory + instance: dict = json.loads(line) - for identifier in instance: - # We need to include the identifier in the list of identifiers so that we know its position - conflation_pipeline.set(identifier, line) + for identifier in instance: + # We need to include the identifier in the list of identifiers so that we know its position + conflation_pipeline.set(identifier, line) - if self._test_mode != 1 and line_counter % block_size == 0: - await RedisConnection.execute_pipeline(conflation_pipeline) - # Pipeline executed create a new one error - conflation_pipeline = conflation_redis.pipeline() - logger.info(f"{line_counter} {conflation_file} lines processed") - - if self._test_mode != 1: + if self._test_mode != 1 and line_counter % block_size == 0: await RedisConnection.execute_pipeline(conflation_pipeline) - logger.info(f"{line_counter} {conflation_file} total lines processed") + # Pipeline executed create a new one error + conflation_pipeline = conflation_redis.pipeline() + logger.info(f"{line_counter} {conflation_file} lines processed") - # Fail if the file was empty. - if line_counter == 0: - raise RuntimeError(f"Conflation file {conflation_file} is empty.") + if self._test_mode != 1: + await RedisConnection.execute_pipeline(conflation_pipeline) + logger.info(f"{line_counter} {conflation_file} total lines processed") - print(f"Done loading {conflation_file}...") - except Exception as e: - logger.error(f"Exception thrown in load_conflation({conflation_file}), line {line_counter}: {e}") - return False + # Fail if the file was empty. + if line_counter == 0: + raise RuntimeError(f"Conflation file {conflation_file} is empty.") + + print(f"Done loading {conflation_file}...") # return to the caller return True @@ -439,100 +431,96 @@ async def load_compendium(self, compendium_filename: str, block_size: int) -> bo # init a line counter line_counter: int = 0 - try: - term2id_redis: RedisConnection = await self.get_redis("eq_id_to_id_db") - id2eqids_redis: RedisConnection = await self.get_redis("id_to_eqids_db") - id2type_redis: RedisConnection = await self.get_redis("id_to_type_db") - info_content_redis: RedisConnection = await self.get_redis("info_content_db") + term2id_redis: RedisConnection = await self.get_redis("eq_id_to_id_db") + id2eqids_redis: RedisConnection = await self.get_redis("id_to_eqids_db") + id2type_redis: RedisConnection = await self.get_redis("id_to_type_db") + info_content_redis: RedisConnection = await self.get_redis("info_content_db") + + term2id_pipeline = term2id_redis.pipeline() + id2eqids_pipeline = id2eqids_redis.pipeline() + id2type_pipeline = id2type_redis.pipeline() + info_content_pipeline = info_content_redis.pipeline() + + with open(compendium_filename, "r", encoding="utf-8") as compendium: + logger.info(f"Processing {compendium_filename}...") + + # for each line in the file + for line in compendium: + line_counter = line_counter + 1 + + # load the line into memory + instance: dict = json.loads(line) + + # save the identifier + # "The" identifier is the first one in the presorted identifiers list + identifier: str = instance["identifiers"][0]["i"] + + # We want to accumulate statistics for each implied type as well, though we are only keeping the + # leaf type in the file (and redis). so now is the time to expand. We'll regenerate the same + # list on output. + semantic_types = self.get_ancestors(instance["type"]) + + # for each semantic type in the list + for semantic_type in semantic_types: + # save the semantic type in a set to avoid duplicates + self.semantic_types.add(semantic_type) + + # create a source prefix if it has not been encountered + if self.source_prefixes.get(semantic_type) is None: + self.source_prefixes[semantic_type] = {} + + # go through each equivalent identifier in the data row + # each will be assigned the semantic type information + for equivalent_id in instance["identifiers"]: + # split the identifier to just get the data source out of the curie + source_prefix: str = equivalent_id["i"].split(":")[0] + + # save the source prefix if no already there + if self.source_prefixes[semantic_type].get(source_prefix) is None: + self.source_prefixes[semantic_type][source_prefix] = 1 + # else just increment the count for the semantic type/source + else: + self.source_prefixes[semantic_type][source_prefix] += 1 - term2id_pipeline = term2id_redis.pipeline() - id2eqids_pipeline = id2eqids_redis.pipeline() - id2type_pipeline = id2type_redis.pipeline() - info_content_pipeline = info_content_redis.pipeline() + # equivalent_id might be an array, where the first element is + # the identifier, or it might just be a string. not worrying about that case yet. + equivalent_id = equivalent_id["i"] + term2id_pipeline.set(equivalent_id.upper(), identifier) + # term2id_pipeline.set(equivalent_id, identifier) - with open(compendium_filename, "r", encoding="utf-8") as compendium: - logger.info(f"Processing {compendium_filename}...") + id2eqids_pipeline.set(identifier, json.dumps(instance["identifiers"])) + id2type_pipeline.set(identifier, instance["type"]) - # for each line in the file - for line in compendium: - line_counter = line_counter + 1 + # if there is information content add it to the cache + if "ic" in instance and instance["ic"] is not None: + info_content_pipeline.set(identifier, instance["ic"]) - # load the line into memory - instance: dict = json.loads(line) - - # save the identifier - # "The" identifier is the first one in the presorted identifiers list - identifier: str = instance["identifiers"][0]["i"] - - # We want to accumulate statistics for each implied type as well, though we are only keeping the - # leaf type in the file (and redis). so now is the time to expand. We'll regenerate the same - # list on output. - semantic_types = self.get_ancestors(instance["type"]) - - # for each semantic type in the list - for semantic_type in semantic_types: - # save the semantic type in a set to avoid duplicates - self.semantic_types.add(semantic_type) - - # create a source prefix if it has not been encountered - if self.source_prefixes.get(semantic_type) is None: - self.source_prefixes[semantic_type] = {} - - # go through each equivalent identifier in the data row - # each will be assigned the semantic type information - for equivalent_id in instance["identifiers"]: - # split the identifier to just get the data source out of the curie - source_prefix: str = equivalent_id["i"].split(":")[0] - - # save the source prefix if no already there - if self.source_prefixes[semantic_type].get(source_prefix) is None: - self.source_prefixes[semantic_type][source_prefix] = 1 - # else just increment the count for the semantic type/source - else: - self.source_prefixes[semantic_type][source_prefix] += 1 - - # equivalent_id might be an array, where the first element is - # the identifier, or it might just be a string. not worrying about that case yet. - equivalent_id = equivalent_id["i"] - term2id_pipeline.set(equivalent_id.upper(), identifier) - # term2id_pipeline.set(equivalent_id, identifier) - - id2eqids_pipeline.set(identifier, json.dumps(instance["identifiers"])) - id2type_pipeline.set(identifier, instance["type"]) - - # if there is information content add it to the cache - if "ic" in instance and instance["ic"] is not None: - info_content_pipeline.set(identifier, instance["ic"]) - - if self._test_mode != 1 and line_counter % block_size == 0: - await RedisConnection.execute_pipeline(term2id_pipeline) - await RedisConnection.execute_pipeline(id2eqids_pipeline) - await RedisConnection.execute_pipeline(id2type_pipeline) - await RedisConnection.execute_pipeline(info_content_pipeline) - - # Pipeline executed create a new one error - term2id_pipeline = term2id_redis.pipeline() - id2eqids_pipeline = id2eqids_redis.pipeline() - id2type_pipeline = id2type_redis.pipeline() - info_content_pipeline = info_content_redis.pipeline() - - logger.info(f"{line_counter} {compendium_filename} lines processed") - - if self._test_mode != 1: + if self._test_mode != 1 and line_counter % block_size == 0: await RedisConnection.execute_pipeline(term2id_pipeline) await RedisConnection.execute_pipeline(id2eqids_pipeline) await RedisConnection.execute_pipeline(id2type_pipeline) await RedisConnection.execute_pipeline(info_content_pipeline) - logger.info(f"{line_counter} {compendium_filename} total lines processed") + # Pipeline executed create a new one error + term2id_pipeline = term2id_redis.pipeline() + id2eqids_pipeline = id2eqids_redis.pipeline() + id2type_pipeline = id2type_redis.pipeline() + info_content_pipeline = info_content_redis.pipeline() - if line_counter == 0: - raise RuntimeError(f"Compendium file {compendium_filename} is empty.") + logger.info(f"{line_counter} {compendium_filename} lines processed") - print(f"Done loading {compendium_filename}...") - except Exception as e: - logger.error(f"Exception thrown in load_compendium({compendium_filename}), line {line_counter}: {e}") - return False + if self._test_mode != 1: + await RedisConnection.execute_pipeline(term2id_pipeline) + await RedisConnection.execute_pipeline(id2eqids_pipeline) + await RedisConnection.execute_pipeline(id2type_pipeline) + await RedisConnection.execute_pipeline(info_content_pipeline) + + logger.info(f"{line_counter} {compendium_filename} total lines processed") + + if line_counter == 0: + raise RuntimeError(f"Compendium file {compendium_filename} is empty.") + + print(f"Done loading {compendium_filename}...") # return to the caller return True From 4bdbc6f95d643497be63b78c7e727b2bc6dfa448 Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Mon, 21 Apr 2025 17:01:33 -0400 Subject: [PATCH 4/8] Removed catch-all exceptions where possible in node_normalizer. --- node_normalizer/normalizer.py | 515 ++++++++++++++++------------------ node_normalizer/server.py | 55 ++-- 2 files changed, 271 insertions(+), 299 deletions(-) diff --git a/node_normalizer/normalizer.py b/node_normalizer/normalizer.py index 333d16f..512553b 100644 --- a/node_normalizer/normalizer.py +++ b/node_normalizer/normalizer.py @@ -68,33 +68,29 @@ async def normalize_message(app: FastAPI, message: Message) -> Message: Given a TRAPI message, updates the message to include a normalized qgraph, kgraph, and results """ - try: - ret = Message() - - logger.debug(f"message.query_graph is None: {message.query_graph is None}") - if message.query_graph is not None: - merged_qgraph = await normalize_qgraph(app, message.query_graph) - ret.query_graph = merged_qgraph - logger.debug(f"Merged Qgraph: {merged_qgraph}") - - logger.debug(f"message.knowledge_graph is None: {message.knowledge_graph is None}") - if message.knowledge_graph is not None: - merged_kgraph, node_id_map, edge_id_map = await normalize_kgraph(app, message.knowledge_graph) - ret.knowledge_graph = merged_kgraph - logger.debug(f"Merged Kgraph: {merged_kgraph}") - logger.debug(f"node_id_map: {node_id_map}") - logger.debug(f"edge_id_map: {edge_id_map}") - - logger.debug(f"message.results is None: {message.results is None}") - if message.results is not None: - merged_results = await normalize_results(app, message.results, node_id_map, edge_id_map) - ret.results = merged_results - logger.debug(f"Merged Results: {merged_results}") - - return ret - except Exception as e: - exception_str = "".join(traceback.format_exc()) - logger.error(f'Exception: {exception_str}') + ret = Message() + + logger.debug(f"message.query_graph is None: {message.query_graph is None}") + if message.query_graph is not None: + merged_qgraph = await normalize_qgraph(app, message.query_graph) + ret.query_graph = merged_qgraph + logger.debug(f"Merged Qgraph: {merged_qgraph}") + + logger.debug(f"message.knowledge_graph is None: {message.knowledge_graph is None}") + if message.knowledge_graph is not None: + merged_kgraph, node_id_map, edge_id_map = await normalize_kgraph(app, message.knowledge_graph) + ret.knowledge_graph = merged_kgraph + logger.debug(f"Merged Kgraph: {merged_kgraph}") + logger.debug(f"node_id_map: {node_id_map}") + logger.debug(f"edge_id_map: {edge_id_map}") + + logger.debug(f"message.results is None: {message.results is None}") + if message.results is not None: + merged_results = await normalize_results(app, message.results, node_id_map, edge_id_map) + ret.results = merged_results + logger.debug(f"Merged Results: {merged_results}") + + return ret async def normalize_results(app, @@ -117,87 +113,78 @@ async def normalize_results(app, node_binding_seen = set() - try: - for node_code, node_bindings in result.node_bindings.items(): - merged_node_bindings = [] - for n_bind in node_bindings: - merged_binding = n_bind.dict() - # merged_binding['id'] = node_id_map[n_bind.id.__root__] - merged_binding['id'] = node_id_map[n_bind.id] - - # get the information content value - ic_attrib = await get_info_content_attribute(app, merged_binding['id']) - - # did we get a good attribute dict - if ic_attrib: - if 'attributes' in merged_binding and merged_binding['attributes'] is not None: - merged_binding['attributes'].append(ic_attrib) - else: - merged_binding['attributes'] = [ic_attrib] - - node_binding_information = [ - "atts" if k == 'attributes' - else (k, tuple(v)) if isinstance(v, list) - else (k, v) - for k, v in merged_binding.items() - ] + for node_code, node_bindings in result.node_bindings.items(): + merged_node_bindings = [] + for n_bind in node_bindings: + merged_binding = n_bind.dict() + # merged_binding['id'] = node_id_map[n_bind.id.__root__] + merged_binding['id'] = node_id_map[n_bind.id] - # if there are attributes in the node binding - if 'attributes' in merged_binding: - # storage for the pydantic Attributes - attribs = [] + # get the information content value + ic_attrib = await get_info_content_attribute(app, merged_binding['id']) - # the items in list of attributes must be of type Attribute - # in order to reuse hash method - if merged_binding['attributes'] is not None: - for attrib in merged_binding['attributes']: - new_attrib = Attribute.parse_obj(attrib) + # did we get a good attribute dict + if ic_attrib: + if 'attributes' in merged_binding and merged_binding['attributes'] is not None: + merged_binding['attributes'].append(ic_attrib) + else: + merged_binding['attributes'] = [ic_attrib] + + node_binding_information = [ + "atts" if k == 'attributes' + else (k, tuple(v)) if isinstance(v, list) + else (k, v) + for k, v in merged_binding.items() + ] + + # if there are attributes in the node binding + if 'attributes' in merged_binding: + # storage for the pydantic Attributes + attribs = [] + + # the items in list of attributes must be of type Attribute + # in order to reuse hash method + if merged_binding['attributes'] is not None: + for attrib in merged_binding['attributes']: + new_attrib = Attribute.parse_obj(attrib) + + # add the new Attribute to the list + attribs.append(new_attrib) + + # call to get the hash + atty_hash = _hash_attributes(attribs) + node_binding_information.append(atty_hash) + node_binding_hash = frozenset(node_binding_information) + + if node_binding_hash in node_binding_seen: + continue + else: + node_binding_seen.add(node_binding_hash) + merged_node_bindings.append(merged_binding) - # add the new Attribute to the list - attribs.append(new_attrib) + merged_result['node_bindings'][node_code] = merged_node_bindings - # call to get the hash - atty_hash = _hash_attributes(attribs) - node_binding_information.append(atty_hash) - node_binding_hash = frozenset(node_binding_information) + edge_binding_seen = set() + for analysis in result.analyses: + for edge_code, edge_bindings in analysis.edge_bindings.items(): + merged_edge_bindings = [] + for e_bind in edge_bindings: + merged_binding = e_bind.dict() + merged_binding['id'] = edge_id_map[e_bind.id] + + edge_binding_hash = frozenset([ + (k, freeze(v)) + for k, v in merged_binding.items() + ]) - if node_binding_hash in node_binding_seen: + if edge_binding_hash in edge_binding_seen: continue else: - node_binding_seen.add(node_binding_hash) - merged_node_bindings.append(merged_binding) + edge_binding_seen.add(edge_binding_hash) + merged_edge_bindings.append(merged_binding) - merged_result['node_bindings'][node_code] = merged_node_bindings - - except Exception as e: - exception_str = "".join(traceback.format_exc()) - logger.error(f'Exception: {exception_str}') - - edge_binding_seen = set() - - try: - for analysis in result.analyses: - for edge_code, edge_bindings in analysis.edge_bindings.items(): - merged_edge_bindings = [] - for e_bind in edge_bindings: - merged_binding = e_bind.dict() - merged_binding['id'] = edge_id_map[e_bind.id] - - edge_binding_hash = frozenset([ - (k, freeze(v)) - for k, v in merged_binding.items() - ]) - - if edge_binding_hash in edge_binding_seen: - continue - else: - edge_binding_seen.add(edge_binding_hash) - merged_edge_bindings.append(merged_binding) - - analysis.edge_bindings[edge_code] = merged_edge_bindings - merged_result['analyses'].append(analysis.dict()) - except Exception as e: - logger.exception(e) + analysis.edge_bindings[edge_code] = merged_edge_bindings + merged_result['analyses'].append(analysis.dict()) try: # This used to have some list comprehension based on types. But in TRAPI 1.1 the list/dicts get pretty deep. @@ -294,152 +281,148 @@ async def normalize_kgraph( node_id_map: Dict[str, str] = {} edge_id_map: Dict[str, str] = {} - try: - # Map for each node id (curie) and its primary id - node_id_map: Dict[str, str] = {} + # Map for each node id (curie) and its primary id + node_id_map: Dict[str, str] = {} - # Map for each edge id and its primary id - edge_id_map: Dict[str, str] = {} + # Map for each edge id and its primary id + edge_id_map: Dict[str, str] = {} - # Map for each edge to its s,p,r,o signature - primary_edges: Dict[Tuple[str, str, Optional[str], str, Union[UUID, int]], str] = {} + # Map for each edge to its s,p,r,o signature + primary_edges: Dict[Tuple[str, str, Optional[str], str, Union[UUID, int]], str] = {} - # cache for primary node ids - primary_nodes_seen = set() + # cache for primary node ids + primary_nodes_seen = set() - # Count of times a node has been merged for attribute merging - node_merge_count: Dict[str, int] = {} + # Count of times a node has been merged for attribute merging + node_merge_count: Dict[str, int] = {} - # cache for nodes - nodes_seen = set() + # cache for nodes + nodes_seen = set() - # cache for subject, predicate, relation, object, attribute hash tuples - edges_seen: Set[Tuple[str, str, str, str, Union[UUID, int]]] = set() + # cache for subject, predicate, relation, object, attribute hash tuples + edges_seen: Set[Tuple[str, str, str, str, Union[UUID, int]]] = set() - for node_id, node in kgraph.nodes.items(): - if node_id in nodes_seen: - continue + for node_id, node in kgraph.nodes.items(): + if node_id in nodes_seen: + continue - nodes_seen.add(node_id) - node_id_map[node_id] = node_id # expected to overridden by primary id + nodes_seen.add(node_id) + node_id_map[node_id] = node_id # expected to overridden by primary id - merged_node = node.dict() + merged_node = node.dict() - equivalent_curies = await get_equivalent_curies(app, node_id) + equivalent_curies = await get_equivalent_curies(app, node_id) - if equivalent_curies[node_id]: - primary_id = equivalent_curies[node_id]['id']['identifier'] - node_id_map[node_id] = primary_id + if equivalent_curies[node_id]: + primary_id = equivalent_curies[node_id]['id']['identifier'] + node_id_map[node_id] = primary_id - if primary_id in primary_nodes_seen: - merged_node = _merge_node_attributes( - node_a=merged_kgraph['nodes'][primary_id], - node_b=node.dict(), - merged_count=node_merge_count[primary_id] - ) - merged_kgraph['nodes'][primary_id] = merged_node - node_merge_count[primary_id] += 1 - continue - else: - node_merge_count[primary_id] = 0 + if primary_id in primary_nodes_seen: + merged_node = _merge_node_attributes( + node_a=merged_kgraph['nodes'][primary_id], + node_b=node.dict(), + merged_count=node_merge_count[primary_id] + ) + merged_kgraph['nodes'][primary_id] = merged_node + node_merge_count[primary_id] += 1 + continue + else: + node_merge_count[primary_id] = 0 - primary_nodes_seen.add(primary_id) + primary_nodes_seen.add(primary_id) - if 'label' in equivalent_curies[node_id]['id']: - primary_label = equivalent_curies[node_id]['id']['label'] - elif 'name' in merged_node: - primary_label = merged_node['name'] + if 'label' in equivalent_curies[node_id]['id']: + primary_label = equivalent_curies[node_id]['id']['label'] + elif 'name' in merged_node: + primary_label = merged_node['name'] + else: + primary_label = '' + + merged_node['name'] = primary_label + + # Even if there's already a same_as attribute we add another + # since it is coming from a new source + if 'equivalent_identifiers' in equivalent_curies[node_id]: + same_as_attribute = { + 'attribute_type_id': 'biolink:same_as', + 'value': [ + node['identifier'] + for node in equivalent_curies[node_id]['equivalent_identifiers'] + ], + 'original_attribute_name': 'equivalent_identifiers', + "value_type_id": "EDAM:data_0006", + + # TODO, should we add the app version as the source + # or perhaps the babel/redis cache version + # This will make unit testing a little more tricky + # see https://stackoverflow.com/q/57624731 + + # 'source': f'{app.title} {app.version}', + } + if 'attributes' in merged_node and merged_node['attributes']: + merged_node['attributes'].append(same_as_attribute) else: - primary_label = '' - - merged_node['name'] = primary_label - - # Even if there's already a same_as attribute we add another - # since it is coming from a new source - if 'equivalent_identifiers' in equivalent_curies[node_id]: - same_as_attribute = { - 'attribute_type_id': 'biolink:same_as', - 'value': [ - node['identifier'] - for node in equivalent_curies[node_id]['equivalent_identifiers'] - ], - 'original_attribute_name': 'equivalent_identifiers', - "value_type_id": "EDAM:data_0006", - - # TODO, should we add the app version as the source - # or perhaps the babel/redis cache version - # This will make unit testing a little more tricky - # see https://stackoverflow.com/q/57624731 - - # 'source': f'{app.title} {app.version}', - } - if 'attributes' in merged_node and merged_node['attributes']: - merged_node['attributes'].append(same_as_attribute) - else: - merged_node['attributes'] = [same_as_attribute] + merged_node['attributes'] = [same_as_attribute] - if 'type' in equivalent_curies[node_id]: - if type(equivalent_curies[node_id]['type']) is list: - merged_node['categories'] = equivalent_curies[node_id]['type'] - else: - merged_node['categories'] = [equivalent_curies[node_id]['type']] + if 'type' in equivalent_curies[node_id]: + if type(equivalent_curies[node_id]['type']) is list: + merged_node['categories'] = equivalent_curies[node_id]['type'] + else: + merged_node['categories'] = [equivalent_curies[node_id]['type']] - # get the information content value - ic_attrib = await get_info_content_attribute(app, node_id) + # get the information content value + ic_attrib = await get_info_content_attribute(app, node_id) - # did we get a good attribute dict - if ic_attrib: - # add the attribute to the node - merged_node['attributes'].append(ic_attrib) + # did we get a good attribute dict + if ic_attrib: + # add the attribute to the node + merged_node['attributes'].append(ic_attrib) - merged_kgraph['nodes'][primary_id] = merged_node - else: - merged_kgraph['nodes'][node_id] = merged_node - - for edge_id, edge in kgraph.edges.items(): - # Accessing __root__ directly seems wrong, - # https://github.com/samuelcolvin/pydantic/issues/730 - # could also do str(edge.subject) - if edge.subject in node_id_map: - primary_subject = node_id_map[edge.subject] - else: - # should we throw a validation error here? - primary_subject = edge.subject + merged_kgraph['nodes'][primary_id] = merged_node + else: + merged_kgraph['nodes'][node_id] = merged_node + + for edge_id, edge in kgraph.edges.items(): + # Accessing __root__ directly seems wrong, + # https://github.com/samuelcolvin/pydantic/issues/730 + # could also do str(edge.subject) + if edge.subject in node_id_map: + primary_subject = node_id_map[edge.subject] + else: + # should we throw a validation error here? + primary_subject = edge.subject - if edge.object in node_id_map: - primary_object = node_id_map[edge.object] - else: - primary_object = edge.object + if edge.object in node_id_map: + primary_object = node_id_map[edge.object] + else: + primary_object = edge.object - hashed_attributes = _hash_attributes(edge.attributes) + hashed_attributes = _hash_attributes(edge.attributes) - if hashed_attributes is False: - # we couldn't hash the attribute so assume unique - hashed_attributes = uuid.uuid4() + if hashed_attributes is False: + # we couldn't hash the attribute so assume unique + hashed_attributes = uuid.uuid4() - triple = ( - primary_subject, - edge.predicate, - primary_object, - hashed_attributes - ) + triple = ( + primary_subject, + edge.predicate, + primary_object, + hashed_attributes + ) - if triple in edges_seen: - edge_id_map[edge_id] = primary_edges[triple] - continue - else: - primary_edges[triple] = edge_id - edge_id_map[edge_id] = edge_id + if triple in edges_seen: + edge_id_map[edge_id] = primary_edges[triple] + continue + else: + primary_edges[triple] = edge_id + edge_id_map[edge_id] = edge_id - edges_seen.add(triple) - merged_edge = edge.dict() + edges_seen.add(triple) + merged_edge = edge.dict() - merged_edge['subject'] = primary_subject - merged_edge['object'] = primary_object - merged_kgraph['edges'][edge_id] = merged_edge - except Exception as e: - exception_str = "".join(traceback.format_exc()) - logger.error(f'Exception: {exception_str}') + merged_edge['subject'] = primary_subject + merged_edge['object'] = primary_object + merged_kgraph['edges'][edge_id] = merged_edge return KnowledgeGraph.parse_obj(merged_kgraph), node_id_map, edge_id_map @@ -854,39 +837,35 @@ async def get_curie_prefixes( """ ret_val: dict = {} # storage for the returned data - try: - # was an arg passed in - if semantic_types: - for item in semantic_types: - # get the curies for this type - curies = await app.state.curie_to_bl_type_db.get(item, encoding='utf-8') + # was an arg passed in + if semantic_types: + for item in semantic_types: + # get the curies for this type + curies = await app.state.curie_to_bl_type_db.get(item, encoding='utf-8') - # did we get any data - if not curies: - curies = '{' + f'"{item}"' + ': "Not found"}' + # did we get any data + if not curies: + curies = '{' + f'"{item}"' + ': "Not found"}' - curies = json.loads(curies) + curies = json.loads(curies) - # set the return data - ret_val[item] = {'curie_prefix': curies} - else: - types = await app.state.curie_to_bl_type_db.lrange('semantic_types', 0, -1, encoding='utf-8') + # set the return data + ret_val[item] = {'curie_prefix': curies} + else: + types = await app.state.curie_to_bl_type_db.lrange('semantic_types', 0, -1, encoding='utf-8') - for item in types: - # get the curies for this type - curies = await app.state.curie_to_bl_type_db.get(item, encoding='utf-8') + for item in types: + # get the curies for this type + curies = await app.state.curie_to_bl_type_db.get(item, encoding='utf-8') - # did we get any data - if not curies: - curies = '{' + f'"{item}"' + ': "Not found"}' + # did we get any data + if not curies: + curies = '{' + f'"{item}"' + ': "Not found"}' - curies = json.loads(curies) + curies = json.loads(curies) - # set the return data - ret_val[item] = {'curie_prefix': curies} - except Exception as e: - exception_str = "".join(traceback.format_exc()) - logger.error(f'Exception: {exception_str}') + # set the return data + ret_val[item] = {'curie_prefix': curies} return ret_val @@ -898,35 +877,31 @@ def _merge_node_attributes(node_a: Dict, node_b, merged_count: int) -> Dict: :param merged_count: the number of nodes merged into node_a **upon entering this fx** """ - try: - if not ('attributes' in node_b and node_b['attributes']): - return node_a - - if merged_count == 0: - if 'attributes' in node_a and node_a['attributes']: - new_attribute_list = [] - for attribute in node_a['attributes']: - new_dict = {} - for k, v in attribute.items(): - new_dict[f"{k}.1"] = v - new_attribute_list.append(new_dict) - - node_a['attributes'] = new_attribute_list - - # Need to DRY this off - b_attr_id = merged_count + 2 - if 'attributes' in node_b and node_b['attributes']: + if not ('attributes' in node_b and node_b['attributes']): + return node_a + + if merged_count == 0: + if 'attributes' in node_a and node_a['attributes']: new_attribute_list = [] - for attribute in node_b['attributes']: + for attribute in node_a['attributes']: new_dict = {} for k, v in attribute.items(): - new_dict[f"{k}.{b_attr_id}"] = v + new_dict[f"{k}.1"] = v new_attribute_list.append(new_dict) - node_a['attributes'] = node_a['attributes'] + new_attribute_list - except Exception as e: - exception_str = "".join(traceback.format_exc()) - logger.error(f'Exception: {exception_str}') + node_a['attributes'] = new_attribute_list + + # Need to DRY this off + b_attr_id = merged_count + 2 + if 'attributes' in node_b and node_b['attributes']: + new_attribute_list = [] + for attribute in node_b['attributes']: + new_dict = {} + for k, v in attribute.items(): + new_dict[f"{k}.{b_attr_id}"] = v + new_attribute_list.append(new_dict) + + node_a['attributes'] = node_a['attributes'] + new_attribute_list return node_a diff --git a/node_normalizer/server.py b/node_normalizer/server.py index f124eb2..5b24b3d 100644 --- a/node_normalizer/server.py +++ b/node_normalizer/server.py @@ -198,35 +198,32 @@ async def async_query(async_query: reasoner_pydantic.AsyncQuery): async def async_query_task(async_query: reasoner_pydantic.AsyncQuery): - try: - async_query.message = await normalize_message(app, async_query.message) - session = requests.Session() - retries = Retry( - total=3, - backoff_factor=3, - status_forcelist=[429, 500, 502, 503, 504], - method_whitelist=[ - "HEAD", - "GET", - "PUT", - "DELETE", - "OPTIONS", - "TRACE", - "POST", - ], - ) - session.mount("http://", HTTPAdapter(max_retries=retries)) - session.mount("https://", HTTPAdapter(max_retries=retries)) - logger.info(f"sending callback to: {async_query.callback}") - - post_response = session.post( - url=async_query.callback, - headers={"Content-Type": "application/json", "Accept": "application/json"}, - data=async_query.json(), - ) - logger.info(f"async_query post status code: {post_response.status_code}") - except BaseException as e: - logger.exception(e) + async_query.message = await normalize_message(app, async_query.message) + session = requests.Session() + retries = Retry( + total=3, + backoff_factor=3, + status_forcelist=[429, 500, 502, 503, 504], + method_whitelist=[ + "HEAD", + "GET", + "PUT", + "DELETE", + "OPTIONS", + "TRACE", + "POST", + ], + ) + session.mount("http://", HTTPAdapter(max_retries=retries)) + session.mount("https://", HTTPAdapter(max_retries=retries)) + logger.info(f"sending callback to: {async_query.callback}") + + post_response = session.post( + url=async_query.callback, + headers={"Content-Type": "application/json", "Accept": "application/json"}, + data=async_query.json(), + ) + logger.info(f"async_query post status code: {post_response.status_code}") @app.get( From ce3ebf6eb0d0deb92dbe7b71cf3c54f7448c41b7 Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Mon, 21 Apr 2025 17:04:14 -0400 Subject: [PATCH 5/8] Added on:push trigger for testing. --- .github/workflows/release.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 800d57b..e9d8f8f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,6 +1,7 @@ name: 'Publish to GitHub Packages' on: + push: release: types: [published] From 5956d3ed270ef0f05eddf190beaa3031cf3a35ef Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Mon, 21 Apr 2025 17:16:29 -0400 Subject: [PATCH 6/8] Changed default logging to INFO and added env var. --- node_normalizer/util.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/node_normalizer/util.py b/node_normalizer/util.py index cb40e05..ee47691 100644 --- a/node_normalizer/util.py +++ b/node_normalizer/util.py @@ -16,16 +16,20 @@ class LoggingUtil(object): """ Logging utility controlling format and setting initial logging level """ @staticmethod - def init_logging(log_file_path=None, log_file_level="ERROR"): + def init_logging(log_file_path=None, log_file_level=None): + # If log_file_path is set, we use that. Otherwise, we use the LOG_LEVEL environmental variable. + if not log_file_level: + log_file_level = os.getenv("LOG_LEVEL", "INFO") + dictConfig({ "version": 1, "disable_existing_loggers": False, "formatters": {"default": {"format": "%(asctime)s | %(levelname)s | %(module)s:%(funcName)s | %(message)s"}}, "handlers": { - "console": {"level": "ERROR", "class": "logging.StreamHandler", "formatter": "default"}, + "console": {"level": log_file_level, "class": "logging.StreamHandler", "formatter": "default"}, }, "loggers": { - "node-norm": {"handlers": ["console"], "level": os.getenv("LOG_LEVEL", "ERROR")}, + "node-norm": {"handlers": ["console"], "level": log_file_level}, }, }) # add gunicorn handlers and configure fastapi loggers From 2618bb98e244f5e8c6b74eac6460d3019282f6c4 Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Mon, 21 Apr 2025 17:27:40 -0400 Subject: [PATCH 7/8] Added arguments to logging. --- node_normalizer/normalizer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node_normalizer/normalizer.py b/node_normalizer/normalizer.py index 512553b..b92bc9c 100644 --- a/node_normalizer/normalizer.py +++ b/node_normalizer/normalizer.py @@ -642,7 +642,9 @@ async def get_normalized_nodes( } end_time = time.time_ns() - logger.info(f"Normalized {len(curies)} nodes in {(end_time - start_time)/1_000_000:.2f} ms: {sorted(curies)}") + logger.info(f"Normalized {len(curies)} nodes in {(end_time - start_time)/1_000_000:.2f} ms with arguments " + + f"(curies={curies}, conflate_gene_protein={conflate_gene_protein}, conflate_chemical_drug={conflate_chemical_drug}, " + + f"include_descriptions={include_descriptions}, include_individual_types={include_individual_types})") return normal_nodes From 531d7c352e6778adc024825fd779d0d913c9ef79 Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Mon, 21 Apr 2025 17:28:06 -0400 Subject: [PATCH 8/8] Removed on:push trigger after testing. --- .github/workflows/release.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e9d8f8f..800d57b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,7 +1,6 @@ name: 'Publish to GitHub Packages' on: - push: release: types: [published]