Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 119 additions & 127 deletions node_normalizer/load_compendia.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,48 +84,44 @@ async def load(compendia_files, block_size, dry_run) -> bool:
# 5-X: conflation databases consisting of canonical_id -> (list of conflated canonical_ids)
# Each of these databases corresponds to a particular conflation e.g. gene/protein or chemical/drug

try:
# get the list of files in the directory
types_prefixes_redis: redis_adapter.RedisConnection = await get_redis("curie_to_bl_type_db")
# for each file validate and process

# check the validity of the files
for comp in compendia_files:
if not validate_compendia(comp):
logger.warning(f"Compendia file {comp} is invalid.")
return False

for comp in compendia_files:
if not validate_compendia(comp):
logger.warning(f"Compendia file {comp} is invalid.")
return False


for comp in compendia_files:
# check the validity of the file

if not validate_compendia(comp):
logger.warning(f"Compendia file {comp} is invalid.")
continue

# try to load the file
loaded = await load_compendium(comp, block_size, dry_run)
semantic_types_redis_pipeline = types_prefixes_redis.pipeline()
# @TODO add meta data about files eg. checksum to this object
# semantic_types_redis_pipeline.set(f"file-{str(comp)}", json.dumps({"source_prefixes": self.source_prefixes}))
if dry_run:
response = await redis_adapter.RedisConnection.execute_pipeline(semantic_types_redis_pipeline)
if asyncio.coroutines.iscoroutine(response):
await response
# self.source_prefixes = {}
if not loaded:
logger.warning(f"Compendia file {comp} did not load.")
continue
# merge all semantic counts from other files / loaders
await merge_semantic_meta_data()
except Exception as e:
logger.error(f"Exception thrown in load(): {e}")
raise e
# get the list of files in the directory
types_prefixes_redis: redis_adapter.RedisConnection = await get_redis("curie_to_bl_type_db")
# for each file validate and process

# check the validity of the files
for comp in compendia_files:
if not validate_compendia(comp):
logger.warning(f"Compendia file {comp} is invalid.")
return False

for comp in compendia_files:
if not validate_compendia(comp):
logger.warning(f"Compendia file {comp} is invalid.")
return False


for comp in compendia_files:
# check the validity of the file

if not validate_compendia(comp):
logger.warning(f"Compendia file {comp} is invalid.")
continue

# try to load the file
loaded = await load_compendium(comp, block_size, dry_run)
semantic_types_redis_pipeline = types_prefixes_redis.pipeline()
# @TODO add meta data about files eg. checksum to this object
# semantic_types_redis_pipeline.set(f"file-{str(comp)}", json.dumps({"source_prefixes": self.source_prefixes}))
if dry_run:
response = await redis_adapter.RedisConnection.execute_pipeline(semantic_types_redis_pipeline)
if asyncio.coroutines.iscoroutine(response):
await response
# self.source_prefixes = {}
if not loaded:
logger.warning(f"Compendia file {comp} did not load.")
continue
# merge all semantic counts from other files / loaders
await merge_semantic_meta_data()

# return to the caller
return True
Expand Down Expand Up @@ -237,97 +233,93 @@ def get_ancestors(input_type):

# init a line counter
line_counter: int = 0
try:
term2id_redis: redis_adapter.RedisConnection = await get_redis("eq_id_to_id_db")
id2eqids_redis: redis_adapter.RedisConnection = await get_redis("id_to_eqids_db")
id2type_redis: redis_adapter.RedisConnection = await get_redis("id_to_type_db")
info_content_redis: redis_adapter.RedisConnection = await get_redis("info_content_db")

term2id_pipeline = term2id_redis.pipeline()
id2eqids_pipeline = id2eqids_redis.pipeline()
id2type_pipeline = id2type_redis.pipeline()
info_content_pipeline = info_content_redis.pipeline()

with open(compendium_filename, "r", encoding="utf-8") as compendium:
logger.info(f"Processing {compendium_filename}...")

# for each line in the file
for line in compendium:
line_counter = line_counter + 1

# load the line into memory
instance: dict = json.loads(line)

# save the identifier
# "The" identifier is the first one in the presorted identifiers list
identifier: str = instance["identifiers"][0]["i"]

# We want to accumulate statistics for each implied type as well, though we are only keeping the
# leaf type in the file (and redis). so now is the time to expand. We'll regenerate the same
# list on output.
semantic_types = get_ancestors(instance["type"])

# for each semantic type in the list
for semantic_type in semantic_types:
# save the semantic type in a set to avoid duplicates
semantic_types.add(semantic_type)

# create a source prefix if it has not been encountered
if source_prefixes.get(semantic_type) is None:
source_prefixes[semantic_type] = {}

# go through each equivalent identifier in the data row
# each will be assigned the semantic type information
for equivalent_id in instance["identifiers"]:
# split the identifier to just get the data source out of the curie
source_prefix: str = equivalent_id["i"].split(":")[0]

# save the source prefix if no already there
if source_prefixes[semantic_type].get(source_prefix) is None:
source_prefixes[semantic_type][source_prefix] = 1
# else just increment the count for the semantic type/source
else:
source_prefixes[semantic_type][source_prefix] += 1

# equivalent_id might be an array, where the first element is
# the identifier, or it might just be a string. not worrying about that case yet.
equivalent_id = equivalent_id["i"]
term2id_pipeline.set(equivalent_id.upper(), identifier)
# term2id_pipeline.set(equivalent_id, identifier)

id2eqids_pipeline.set(identifier, json.dumps(instance["identifiers"]))
id2type_pipeline.set(identifier, instance["type"])

# if there is information content add it to the cache
if "ic" in instance and instance["ic"] is not None:
info_content_pipeline.set(identifier, instance["ic"])

if not dry_run and line_counter % block_size == 0:
await redis_adapter.RedisConnection.execute_pipeline(term2id_pipeline)
await redis_adapter.RedisConnection.execute_pipeline(id2eqids_pipeline)
await redis_adapter.RedisConnection.execute_pipeline(id2type_pipeline)
await redis_adapter.RedisConnection.execute_pipeline(info_content_pipeline)

# Pipeline executed create a new one error
term2id_pipeline = term2id_redis.pipeline()
id2eqids_pipeline = id2eqids_redis.pipeline()
id2type_pipeline = id2type_redis.pipeline()
info_content_pipeline = info_content_redis.pipeline()

logger.info(f"{line_counter} {compendium_filename} lines processed")

if not dry_run:
term2id_redis: redis_adapter.RedisConnection = await get_redis("eq_id_to_id_db")
id2eqids_redis: redis_adapter.RedisConnection = await get_redis("id_to_eqids_db")
id2type_redis: redis_adapter.RedisConnection = await get_redis("id_to_type_db")
info_content_redis: redis_adapter.RedisConnection = await get_redis("info_content_db")

term2id_pipeline = term2id_redis.pipeline()
id2eqids_pipeline = id2eqids_redis.pipeline()
id2type_pipeline = id2type_redis.pipeline()
info_content_pipeline = info_content_redis.pipeline()

with open(compendium_filename, "r", encoding="utf-8") as compendium:
logger.info(f"Processing {compendium_filename}...")

# for each line in the file
for line in compendium:
line_counter = line_counter + 1

# load the line into memory
instance: dict = json.loads(line)

# save the identifier
# "The" identifier is the first one in the presorted identifiers list
identifier: str = instance["identifiers"][0]["i"]

# We want to accumulate statistics for each implied type as well, though we are only keeping the
# leaf type in the file (and redis). so now is the time to expand. We'll regenerate the same
# list on output.
semantic_types = get_ancestors(instance["type"])

# for each semantic type in the list
for semantic_type in semantic_types:
# save the semantic type in a set to avoid duplicates
semantic_types.add(semantic_type)

# create a source prefix if it has not been encountered
if source_prefixes.get(semantic_type) is None:
source_prefixes[semantic_type] = {}

# go through each equivalent identifier in the data row
# each will be assigned the semantic type information
for equivalent_id in instance["identifiers"]:
# split the identifier to just get the data source out of the curie
source_prefix: str = equivalent_id["i"].split(":")[0]

# save the source prefix if no already there
if source_prefixes[semantic_type].get(source_prefix) is None:
source_prefixes[semantic_type][source_prefix] = 1
# else just increment the count for the semantic type/source
else:
source_prefixes[semantic_type][source_prefix] += 1

# equivalent_id might be an array, where the first element is
# the identifier, or it might just be a string. not worrying about that case yet.
equivalent_id = equivalent_id["i"]
term2id_pipeline.set(equivalent_id.upper(), identifier)
# term2id_pipeline.set(equivalent_id, identifier)

id2eqids_pipeline.set(identifier, json.dumps(instance["identifiers"]))
id2type_pipeline.set(identifier, instance["type"])

# if there is information content add it to the cache
if "ic" in instance and instance["ic"] is not None:
info_content_pipeline.set(identifier, instance["ic"])

if not dry_run and line_counter % block_size == 0:
await redis_adapter.RedisConnection.execute_pipeline(term2id_pipeline)
await redis_adapter.RedisConnection.execute_pipeline(id2eqids_pipeline)
await redis_adapter.RedisConnection.execute_pipeline(id2type_pipeline)
await redis_adapter.RedisConnection.execute_pipeline(info_content_pipeline)

logger.info(f"{line_counter} {compendium_filename} total lines processed")
# Pipeline executed create a new one error
term2id_pipeline = term2id_redis.pipeline()
id2eqids_pipeline = id2eqids_redis.pipeline()
id2type_pipeline = id2type_redis.pipeline()
info_content_pipeline = info_content_redis.pipeline()

logger.info(f"{line_counter} {compendium_filename} lines processed")

if not dry_run:
await redis_adapter.RedisConnection.execute_pipeline(term2id_pipeline)
await redis_adapter.RedisConnection.execute_pipeline(id2eqids_pipeline)
await redis_adapter.RedisConnection.execute_pipeline(id2type_pipeline)
await redis_adapter.RedisConnection.execute_pipeline(info_content_pipeline)

logger.info(f"{line_counter} {compendium_filename} total lines processed")

print(f"Done loading {compendium_filename}...")
except Exception as e:
logger.error(f"Exception thrown in load_compendium({compendium_filename}), line {line_counter}: {e}")
return False
print(f"Done loading {compendium_filename}...")

# return to the caller
return True
Expand Down
46 changes: 21 additions & 25 deletions node_normalizer/load_conflation.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,38 +146,34 @@ async def load_conflation(self, conflation: dict, block_size: int) -> bool:
conflation_redis_connection_name = conflation["redis_db"]
# init a line counter
line_counter: int = 0
try:
conflation_redis: RedisConnection = await self.get_redis(conflation_redis_connection_name)
conflation_pipeline = conflation_redis.pipeline()
conflation_redis: RedisConnection = await self.get_redis(conflation_redis_connection_name)
conflation_pipeline = conflation_redis.pipeline()

with open(f"{self._conflation_directory}/{conflation_file}", "r", encoding="utf-8") as cfile:
logger.info(f"Processing {conflation_file}...")
with open(f"{self._conflation_directory}/{conflation_file}", "r", encoding="utf-8") as cfile:
logger.info(f"Processing {conflation_file}...")

# for each line in the file
for line in cfile:
line_counter = line_counter + 1
# for each line in the file
for line in cfile:
line_counter = line_counter + 1

# load the line into memory
instance: dict = json.loads(line)

for identifier in instance:
# We need to include the identifier in the list of identifiers so that we know its position
conflation_pipeline.set(identifier, line)
# load the line into memory
instance: dict = json.loads(line)

if self._test_mode != 1 and line_counter % block_size == 0:
await RedisConnection.execute_pipeline(conflation_pipeline)
# Pipeline executed create a new one error
conflation_pipeline = conflation_redis.pipeline()
logger.info(f"{line_counter} {conflation_file} lines processed")
for identifier in instance:
# We need to include the identifier in the list of identifiers so that we know its position
conflation_pipeline.set(identifier, line)

if self._test_mode != 1:
if self._test_mode != 1 and line_counter % block_size == 0:
await RedisConnection.execute_pipeline(conflation_pipeline)
logger.info(f"{line_counter} {conflation_file} total lines processed")
# Pipeline executed create a new one error
conflation_pipeline = conflation_redis.pipeline()
logger.info(f"{line_counter} {conflation_file} lines processed")

if self._test_mode != 1:
await RedisConnection.execute_pipeline(conflation_pipeline)
logger.info(f"{line_counter} {conflation_file} total lines processed")

print(f"Done loading {conflation_file}...")
except Exception as e:
logger.error(f"Exception thrown in load_conflation({conflation_file}), line {line_counter}: {e}")
return False
print(f"Done loading {conflation_file}...")

# return to the caller
return True
Expand Down
Loading