diff --git a/README.md b/README.md index 3462a6f..639740b 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,6 @@ pip install -r requirements.txt - zebrafish.fa = This is a dummy assembly - zebrafish.gvf = This is a highly reduced GVF file https://ftp.ebi.ac.uk/pub/databases/dgva/nstd62_Brown_et_al_2012/gvf/ - human.fa = This is a full assembly which has been converted so the naming convention follows the corresponding GVF -- human_est199.gvf = This is a full GVF file https://ftp.ebi.ac.uk/pub/databases/dgva/estd199_1000_Genomes_Consortium_Phase_1/gvf/ +- human_est199.gvf = This is a full GVF file https://ftp.ebi.ac.uk/pub/databases/dgva/estd199_1000_Genomes_Consortium_Phase_1/gvf/ - drosophila_nstd134.gvf = This is a full GVF file https://ftp.ebi.ac.uk/pub/databases/dgva/nstd134_Gilks_et_al_2016/gvf/ -- drosophila_GCA_000001215.4.fa = This is a full assembly representing GCA_000001215.4 \ No newline at end of file +- drosophila_GCA_000001215.4.fa = This is a full assembly representing GCA_000001215.4 \ No newline at end of file diff --git a/convert_gvf_to_vcf/convertGVFtoVCF.py b/convert_gvf_to_vcf/convertGVFtoVCF.py index c4ff8f1..cbaa859 100644 --- a/convert_gvf_to_vcf/convertGVFtoVCF.py +++ b/convert_gvf_to_vcf/convertGVFtoVCF.py @@ -166,15 +166,21 @@ def convert_gvf_pragmas_for_vcf_header(gvf_pragmas, return unique_converted_pragmas, unique_sample_name # the function below relates to the VCF headerline (Part 2) -def generate_vcf_header_line(samples): +def generate_vcf_header_line(is_missing_format, samples): """ Generates the VCF header line using the nine mandatory headers and the sample names. + :param is_missing_format: boolean (False = Format + sample names, True = mandatory fields only) :param samples: list of samples, these will appear in the header line :return: vcf_header: a string """ - vcf_header_fields = ["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT"] - for sample in samples: - vcf_header_fields.append(sample) - vcf_header = '\t'.join(vcf_header_fields) + if is_missing_format: + vcf_header_fields = ["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO"] + vcf_header = '\t'.join(vcf_header_fields) + else: + + vcf_header_fields = ["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT"] + for sample in samples: + vcf_header_fields.append(sample) + vcf_header = '\t'.join(vcf_header_fields) return vcf_header # the functions below relate to the GVF header @@ -259,6 +265,22 @@ def convert_gvf_features_to_vcf_objects(gvf_lines_obj_list, reference_lookup, or return standard_header_lines, list_of_vcf_objects # The functions below relate to the VCF objects +def collect_missing_format_flags(list_of_vcf_objects): + """ Returns a list of booleans for each VCF object (True if format is missing, False if format keys present) + :params: list_of_vcf_objects: list of vcf objects + :return: missing_format_flags: list of booleans (True if format is missing, False if format keys present) + """ + missing_format_flags = [] + for index in range(1, len(list_of_vcf_objects)): + if list_of_vcf_objects[index].format_keys == ['.']: + format_flag = True + missing_format_flags.append(format_flag) + else: + format_flag = False + missing_format_flags.append(format_flag) + return missing_format_flags + + def compare_vcf_objects(list_of_vcf_objects): """ Compares VCF objects in the list with the VCF object before it. Returns boolean values. :params: list_of_vcf_objects: list of vcf objects @@ -314,6 +336,69 @@ def determine_merge_or_keep_vcf_objects(list_of_vcf_objects, comparison_results, merge_or_kept_objects.append(list_of_vcf_objects[-1]) return merge_or_kept_objects +def get_chrom_pos_of_vcf_object(obj): + """ + Returns chromosome and position of an object. Used to help sort VCF object by chromosome name and by numeric position. + :params: obj : vcf line object + :return: obj.chrom, obj.pos + """ + return (obj.chrom, int(obj.pos)) + +def has_duplicates(list_of_objects): + """ Checks chrom and pos of vcf line objects to see if there are duplicates. If list of vcf object has duplicates, merge again. + :params: list_of_objects + :return: duplicate_flag - boolean value (True = has duplicates so merge again, False= no duplicates) + """ + duplicate_flag = False + list_of_duplicate_chrom_pos = [] + seen_chrom_pos = set() + for obj in list_of_objects: + chrom_pos = (obj.chrom, int(obj.pos)) + if chrom_pos in seen_chrom_pos: + # returns true to allow to merge again + duplicate_flag = True + list_of_duplicate_chrom_pos.append(chrom_pos) + else: + seen_chrom_pos.add(chrom_pos) + return duplicate_flag, list_of_duplicate_chrom_pos + + +def get_list_of_merged_vcf_objects(list_of_vcf_objects, samples): + """ Compares VCF objects, merges VCF objects, sorts VCF objects. This gives a list of sorted merged vcf objects. + :params: list_of_vcf_objects + :params: samples + :returns: merge_or_kept_vcf_objects + """ + comparison_flags = compare_vcf_objects(list_of_vcf_objects) # Identifies which VCF objects to merge + merge_or_kept_vcf_objects = determine_merge_or_keep_vcf_objects(list_of_vcf_objects, comparison_flags, samples) + merge_or_kept_vcf_objects.sort(key=get_chrom_pos_of_vcf_object) # sorting by chromosome and position + return merge_or_kept_vcf_objects + +def filter_duplicates_by_merging(chrom_pos_list, has_dups, list_of_vcf_objects, + list_of_vcf_objects_to_be_filtered, samples): + """ + :params: chrom_pos_list: list of tuples - this represents duplicate positions + :params: has_dups: boolean - True if it contains duplicates + :params: list_of_vcf_objects: list of VCF objects (GVF converted to VCF; with no merging/remove dups) + :params: list_of_vcf_objects_to_be_filtered: list of VCF objects from a previous merge + :params: samples: names + :returns: filtered_merge_or_kept_vcf_objects - list of vcf objects with duplicates from this iteration removed + """ + if has_dups: + for chrom_pos in chrom_pos_list: + chrom_to_search = chrom_pos[0] + pos_to_search = chrom_pos[1] + vcf_objects_to_merge = [] + for vcf_object in list_of_vcf_objects: + if vcf_object.chrom == chrom_to_search and vcf_object.pos == pos_to_search: + vcf_objects_to_merge.append(vcf_object) + + merge_duplicates = get_list_of_merged_vcf_objects(vcf_objects_to_merge, samples) + filtered_merge_or_kept_vcf_objects = [x for x in list_of_vcf_objects_to_be_filtered if x not in vcf_objects_to_merge] + filtered_merge_or_kept_vcf_objects.extend(merge_duplicates) + filtered_merge_or_kept_vcf_objects.sort(key=get_chrom_pos_of_vcf_object) + return filtered_merge_or_kept_vcf_objects + def main(): # Parse command line arguments parser = argparse.ArgumentParser() @@ -375,26 +460,51 @@ def main(): for header_line in header_type: vcf_output.write(f"{header_line}\n") - # Part 2 of VCF file: Write the VCF header line. This is the nine mandatory fields with its sample names. - header_fields = generate_vcf_header_line(samples) + # Part 2 of VCF file: Write the VCF header line. + # Determine if the header is the 8 mandatory fields or 8 mandatory fields + FORMAT + sample names. + missing_flags = collect_missing_format_flags(list_of_vcf_objects) # True if format keys are missing, False if present + if all(missing_flags): + is_missing_format_value = True + logger.info("No Format Keys detected. Printing mandatory VCF headers.") + else: + is_missing_format_value = False + # Write the header. + header_fields = generate_vcf_header_line(is_missing_format=is_missing_format_value, samples=samples) vcf_output.write(f"{header_fields}\n") # Part 3 of VCF file: Write the VCF data lines. This will contain info about the position in the genome, # its variants and genotype information per sample. - if len(gvf_lines_obj_list) > 0: + if (list_of_vcf_objects): logger.info("Generating the VCF datalines") # Each GVF feature has been converted to a VCF object so begin comparing and merging the VCF objects. - comparison_flags = compare_vcf_objects(list_of_vcf_objects) # Identifies which VCF objects to merge - merge_or_kept_vcf_objects = determine_merge_or_keep_vcf_objects(list_of_vcf_objects, comparison_flags, samples) + # initial merge + merge_or_kept_vcf_objects = get_list_of_merged_vcf_objects(list_of_vcf_objects, samples) + # identify if duplicates are present after merging + has_dups, chrom_pos_list = has_duplicates(merge_or_kept_vcf_objects) + # while duplicates are present, merge, then re-check for dups + max_iterations = 100 + iteration = 0 + list_of_vcf_objects_to_be_filtered = merge_or_kept_vcf_objects + while has_dups and iteration < max_iterations: + filtered_merge_or_kept_vcf_objects = filter_duplicates_by_merging(chrom_pos_list, has_dups, + list_of_vcf_objects, + list_of_vcf_objects_to_be_filtered, samples) + has_dups, chrom_pos_list = has_duplicates(filtered_merge_or_kept_vcf_objects) + iteration += 1 + list_of_vcf_objects_to_be_filtered = filtered_merge_or_kept_vcf_objects + logger.info(f"Iteration of merge (remove dups): {iteration}") + # Write the VCF objects as data lines in the VCF file. - for vcf_line_object in merge_or_kept_vcf_objects: - vcf_output.write(str(vcf_line_object) + "\n") - # vcf_output.write("\t".join(str(val) for val in line) + "\n") + if iteration != 0: + for vcf_line_object in filtered_merge_or_kept_vcf_objects: + vcf_output.write(str(vcf_line_object) + "\n") + else: + for vcf_line_object in merge_or_kept_vcf_objects: + vcf_output.write(str(vcf_line_object) + "\n") else: logger.warning("No feature lines were found for this GVF file.") vcf_output.close() logger.info("GVF to VCF conversion complete") - if __name__ == "__main__": main() diff --git a/convert_gvf_to_vcf/etc/attribute_mapper.yaml b/convert_gvf_to_vcf/etc/attribute_mapper.yaml index 839286a..67b129a 100644 --- a/convert_gvf_to_vcf/etc/attribute_mapper.yaml +++ b/convert_gvf_to_vcf/etc/attribute_mapper.yaml @@ -1,3 +1,4 @@ +# The purpose of this file is to map the GVF attribute to its corresponding VCF tag (Field:FieldKey) # Below is an example format # name: (this can be the name of gvf_attribute or if unavailable, a general name) # FIELD: (INFO/FORMAT/ALT) @@ -494,6 +495,13 @@ parid: Type: String Description: "ID of partner breakend" From: SVVCF +parent: + INFO: + FieldKey: PARENT + Number: . + Type: String + Description: "Parent" + From: [SVVCF, DGVa] phred_likelihoods: FORMAT: FieldKey: PL @@ -594,7 +602,7 @@ svlen: INFO: FieldKey: SVLEN Number: A - Type: String + Type: Integer Description: "Length of structural variant" From: SVVCF svtype: @@ -734,13 +742,6 @@ Name: Type: String Description: "Name" From: DGVa -Parent: - INFO: - FieldKey: PARENT - Number: . - Type: String - Description: "Parent" - From: DGVa phenotype_description: INFO: FieldKey: PHENODESC @@ -776,11 +777,18 @@ samples: Type: String Description: "Samples" From: DGVa +sample_name: + INFO: + FieldKey: SAMPLENAME + Number: . + Type: String + Description: "Sample Name" + From: DGVa submitter_variant_call_id: INFO: FieldKey: SVCID Number: . - Type: Integer + Type: String Description: "submitter variant call ID" From: DGVa submitter_variant_region_id: @@ -824,7 +832,14 @@ variant_call_so_id: Number: . Type: String Description: "Variant call Sequence ontology ID" - From: + From: DGVa +variant_region_description: + INFO: + FieldKey: VARREGDESC + Number: . + Type: String + Description: "Variant Region Sequence Ontology ID" + From: DGVa ID: INFO: FieldKey: ID diff --git a/convert_gvf_to_vcf/vcfline.py b/convert_gvf_to_vcf/vcfline.py index f3be05f..faa74cb 100644 --- a/convert_gvf_to_vcf/vcfline.py +++ b/convert_gvf_to_vcf/vcfline.py @@ -4,7 +4,9 @@ from Bio import SeqIO from convert_gvf_to_vcf.assistingconverter import convert_gvf_attributes_to_vcf_values - +from convert_gvf_to_vcf.logger import logger +from dataclasses import dataclass +from typing import Optional,Union def extract_reference_allele(fasta_file, chromosome_name, position, end): @@ -25,6 +27,25 @@ def extract_reference_allele(fasta_file, chromosome_name, position, end): records_dictionary.close() return reference_allele +@dataclass +class VariantRange: + """ + This is responsible for representing the co-ordinate ranges of a structural variant. + Attributes: + pos: start + start_range_lower_bound: can be int "." or None + start_range_upper_bound: can be int "." or None + end: end + end_range_lower_bound: can be int "." or None + end_range_upper_bound: can be int "." or None + """ + pos: int + start_range_lower_bound: Optional[Union[int, str]] #value can be int, "." or None + start_range_upper_bound: Optional[Union[int, str]] #value can be int, "." or None + end: int + end_range_lower_bound: Optional[Union[int, str]] #value can be int, "." or None + end_range_upper_bound: Optional[Union[int, str]] #value can be int, "." or None + class VcfLineBuilder: """ This class is responsible for creating VcfLine objects that contain VCF datalines. @@ -128,7 +149,7 @@ def check_ref(self, ref_allele_to_be_checked): else: checked_reference_allele = ref_allele_to_be_checked else: - print("WARNING: Ref allele must be a string. Setting to missing value.", ref_allele_to_be_checked) + logger.warning(f"Ref allele must be a string: {ref_allele_to_be_checked}. Setting to missing value.") checked_reference_allele = "." return checked_reference_allele @@ -144,13 +165,13 @@ def get_ref(self, vcf_value_from_gvf_attribute, chrom, pos, end): if assembly_file: reference_allele = extract_reference_allele(assembly_file, chrom, pos, end) else: - print("WARNING: No reference provided. Placeholder inserted for Reference allele.") + logger.warning("No reference provided. Placeholder inserted for Reference allele.") reference_allele = "." if reference_allele != ".": reference_allele = self.check_ref(reference_allele) return reference_allele - def create_coordinate_range(self, vcf_value_from_gvf_attribute, pos, end): + def create_coordinate_range(self, vcf_value_from_gvf_attribute): """ Create the start and end range using the dictionary of GVF attributes and the pos and end :return: start_range_lower_bound, start_range_upper_bound, end_range_lower_bound, end_range_upper_bound """ @@ -172,120 +193,55 @@ def create_coordinate_range(self, vcf_value_from_gvf_attribute, pos, end): end_range_upper_bound = None return start_range_lower_bound, start_range_upper_bound, end_range_lower_bound, end_range_upper_bound - - def generate_symbolic_allele(self, vcf_value_from_gvf_attribute, pos, end, length, ref, so_type): - """ Generates the symbolic allele and stores the corresponding metainformation lines. - Also provides the dictionary for the info field that correspond to the symbolic allele - :return: symbolic_allele, info_dict - """ - symbolic_allele_id = self.reference_lookup.symbolic_allele_dictionary[so_type][1] - symbolic_allele = f'<{symbolic_allele_id}>' - - if symbolic_allele_id in self.all_possible_lines_dictionary["ALT"]: - self.field_lines_dictionary["ALT"].append(self.all_possible_lines_dictionary["ALT"][symbolic_allele_id]) - - # Creating start/end co-ordinate ranges - ( - start_range_lower_bound, start_range_upper_bound, - end_range_lower_bound, end_range_upper_bound - ) = self.create_coordinate_range(vcf_value_from_gvf_attribute, pos, end) - - info_dict, is_imprecise = self.generate_info_field_symbolic_allele(end, - end_range_lower_bound, - end_range_upper_bound, - length, pos, ref, - start_range_lower_bound, - start_range_upper_bound, - symbolic_allele) - - # for all variants (precise and imprecise) store INFO lines for the header - self.field_lines_dictionary["INFO"].append(self.all_possible_lines_dictionary["INFO"]["END"]) - self.field_lines_dictionary["INFO"].append(self.all_possible_lines_dictionary["INFO"]["SVLEN"]) - - # for imprecise variants only - if is_imprecise: - self.field_lines_dictionary["INFO"].append(self.all_possible_lines_dictionary["INFO"]["IMPRECISE"]) - self.field_lines_dictionary["INFO"].append(self.all_possible_lines_dictionary["INFO"]["CIPOS"]) - self.field_lines_dictionary["INFO"].append(self.all_possible_lines_dictionary["INFO"]["CIEND"]) - return symbolic_allele, info_dict - - def generate_info_field_symbolic_allele(self, end, - end_range_lower_bound, - end_range_upper_bound, - length, pos, ref, - start_range_lower_bound, - start_range_upper_bound, symbolic_allele): + def generate_info_field_symbolic_allele(self, + variant_range_coordinates, + length, ref, symbolic_allele): """ Generate the dictionary INFO field values for the symbolic allele. - :param end: gvf_feature_line_object.end - :param end_range_lower_bound: end range co-ordinate first number - :param end_range_upper_bound: end range co-ordinate second number - :param info_end_value: INFO field called "END" - default set to None + :param variant_range_coordinates: object with attribues:pos, start_range_lower_bound, start_range_upper_bound, end, end_range_lower_bound, end_range_upper_bound :param length: end - pos - :param pos: gvf_feature_line_object.start - :param ref: refernece allele - :param start_range_lower_bound: start range co-ordinate first number - :param start_range_upper_bound: start range co-ordinate second number + :param ref: reference allele :param symbolic_allele: symbolic allele for ALT :return: info_dict: dict of key-value pairs for INFO field (END, IMPRECISE, CIPOS, CIEND, SVLEN), is_imprecise: boolean """ - # setting up fields to be inserted into INFO. Default is None - info_end_key = "END" - info_end_value = None - info_imprecise_key = "IMPRECISE" - info_imprecise_value = None - info_cipos_key = "CIPOS" - info_cipos_value = None - info_ciend_key = "CIEND" - info_ciend_value = None - info_svlen_key = "SVLEN" - info_svlen_value = None + + # Set up info dictionary + info_dict = {} + # Setting up the info keys with None as a default value + for info_key in ["END", "IMPRECISE", "CIPOS", "CIEND", "SVLEN"]: + info_dict[info_key] = None if length: - info_svlen_value = str(length) - # Precise variant: keep all None values, set info_end_value and is_imprecise - if ( - start_range_lower_bound is None or - start_range_upper_bound is None or - end_range_lower_bound is None or - end_range_upper_bound is None - ): - info_end_value, is_imprecise = self.generate_info_field_for_precise_variant(pos, ref) + info_dict["SVLEN"] = str(length) + # Calculate precise or imprecise + range_keys = ["start_range_lower_bound", "start_range_upper_bound", "end_range_lower_bound", "end_range_upper_bound"] + # in the case of a PRECISE variant + if any(range_key is None for range_key in range_keys): + info_dict["END"], is_imprecise = self.generate_info_field_for_precise_variant(variant_range_coordinates.pos, ref) + # IMPRECISE variants else: - # Imprecise variant: change none values accordingly - (info_ciend_value, info_cipos_value, - info_end_value, info_imprecise_value, - is_imprecise) = self.generate_info_field_for_imprecise_variant( - end, - end_range_lower_bound, end_range_upper_bound, - info_end_value, - # info_ciend_value, info_cipos_value, info_end_value, info_imprecise_value, - length, pos, ref, - start_range_lower_bound, start_range_upper_bound, symbolic_allele) - # Set up INFO values for structural variants and store in the info_dict - info_dict = { - info_end_key: info_end_value, - info_imprecise_key: info_imprecise_value, - info_cipos_key: info_cipos_value, - info_ciend_key: info_ciend_value, - info_svlen_key: info_svlen_value - } + info_fields_for_imprecise_variants = self.generate_info_field_for_imprecise_variant( + variant_range_coordinates, + length, ref, + symbolic_allele + ) + info_dict.update({ + "CIEND": info_fields_for_imprecise_variants[0], + "CIPOS": info_fields_for_imprecise_variants[1], + "END": info_fields_for_imprecise_variants[2], + "IMPRECISE": info_fields_for_imprecise_variants[3]}) + # Set the imprecise flag + is_imprecise = info_fields_for_imprecise_variants[4] return info_dict, is_imprecise - def generate_info_field_for_imprecise_variant(self, end, end_range_lower_bound, end_range_upper_bound, - info_end_value, - length, pos, ref, start_range_lower_bound, start_range_upper_bound, + def generate_info_field_for_imprecise_variant(self, + variant_range_coordinates, + length, ref, symbolic_allele): """ Generate the INFO field values for an imprecise variant. - :param end: gvf_feature_line_object.end - :param end_range_lower_bound: end range co-ordinate first number - :param end_range_upper_bound: end range co-ordinate second number - :param info_end_value: INFO field called "END" - default set to None + :param variant_range_coordinates: object with attribues:pos, start_range_lower_bound, start_range_upper_bound, end, end_range_lower_bound, end_range_upper_bound :param length: end - pos - :param pos: gvf_feature_line_object.start :param ref: refernece allele - :param start_range_lower_bound: start range co-ordinate first number - :param start_range_upper_bound: start range co-ordinate second number :param symbolic_allele: symbolic allele for ALT :return: info_ciend_value(string i.e "0,100"), @@ -298,21 +254,23 @@ def generate_info_field_for_imprecise_variant(self, end, end_range_lower_bound, is_imprecise = True info_imprecise_value = "IMPRECISE" ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound = self.calculate_CIPOS_and_CIEND( - end, end_range_lower_bound, end_range_upper_bound, pos, start_range_lower_bound, start_range_upper_bound) + variant_range_coordinates) + # end, end_range_lower_bound, end_range_upper_bound, pos, start_range_lower_bound, start_range_upper_bound) # form the CIPOS value - info_cipos_value = str(cipos_lower_bound) + "," + str(cipos_upper_bound) + info_cipos_value = f"{str(cipos_lower_bound)},{str(cipos_upper_bound)}" if all(cipos_bound is not None for cipos_bound in (cipos_lower_bound, cipos_upper_bound)) else None # form the CIEND value - info_ciend_value = str(ciend_lower_bound) + "," + str(ciend_upper_bound) + info_ciend_value = f"{str(ciend_lower_bound)},{str(ciend_upper_bound)}" if all(ciend_bound is not None for ciend_bound in (cipos_lower_bound, cipos_upper_bound)) else None + # Determine the END value based on the symbolic allele if symbolic_allele == "": - info_end_value = str(pos + len(ref) - 1) - elif symbolic_allele in {"", "", "", ""}: - info_end_value = str(pos + length) + info_end_value = str(variant_range_coordinates.pos + len(ref) - 1) + elif any(sym in symbolic_allele for sym in {"DEL", "DUP", "INV", "CNV"}): + info_end_value = str(variant_range_coordinates.pos + length) elif symbolic_allele == "<*>": - info_end_value = str(pos + len(ref)) + info_end_value = str(variant_range_coordinates.pos + len(ref)) else: - print("Cannot identify symbolic allele") + logger.warning(f"Cannot identify symbolic allele: {symbolic_allele}") return info_ciend_value, info_cipos_value, info_end_value, info_imprecise_value, is_imprecise def generate_info_field_for_precise_variant(self, pos, ref): @@ -325,42 +283,130 @@ def generate_info_field_for_precise_variant(self, pos, ref): info_end_value = str(pos + len(ref) - 1) return info_end_value, is_imprecise - def calculate_CIPOS_and_CIEND(self, end, end_range_lower_bound, end_range_upper_bound, pos, start_range_lower_bound, - start_range_upper_bound): + def calculate_CIPOS_and_CIEND(self, + variant_range_cordinates): """Converts start and end range co-ordinates into CIPOS and CIEND (for VCF info field). - :param end: gvf_feature_line_object.end - :param end_range_lower_bound: end range co-ordinate first number - :param end_range_upper_bound: end range co-ordinate second number - :param pos: gvf_feature_line_object.start - :param start_range_lower_bound: start range co-ordinate first number - :param start_range_upper_bound: start range co-ordinate second number + :param variant_range_cordinates: object with attributes:pos, start_range_lower_bound, start_range_upper_bound, end, end_range_lower_bound, end_range_upper_bound :return: ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound """ # Converting start_range co-ordinates - # if the lower bound is unknown for the imprecise variant, pass this to CIPOS (i.e. CIPOS=.,0) - if start_range_lower_bound == ".": - cipos_lower_bound = "." - else: - cipos_lower_bound = int(start_range_lower_bound) - pos - # if the upper bound is unknown for the imprecise variant, pass this to CIPOS(i.e. CIPOS=0,.) - if start_range_upper_bound == ".": - cipos_upper_bound = "." - else: - cipos_upper_bound = int(start_range_upper_bound) - pos - + cipos_lower_bound = self.convert_to_ci_bound(variant_range_cordinates.pos, variant_range_cordinates.start_range_lower_bound) + cipos_upper_bound = self.convert_to_ci_bound(variant_range_cordinates.pos, variant_range_cordinates.start_range_upper_bound) # Converting End_range co-ordinates - # if the lower bound is unknown for the imprecise variant, pass this to CIPOS (i.e. CIPOS=.,0) - if end_range_lower_bound == ".": - ciend_lower_bound = "." - else: - ciend_lower_bound = int(end_range_lower_bound) - end - # if the upper bound is unknown for the imprecise variant, pass this to CIPOS(i.e. CIPOS=0,.) - if end_range_upper_bound == ".": - ciend_upper_bound = "." - else: - ciend_upper_bound = int(end_range_upper_bound) - end + ciend_lower_bound = self.convert_to_ci_bound(variant_range_cordinates.end, variant_range_cordinates.end_range_lower_bound) + ciend_upper_bound = self.convert_to_ci_bound(variant_range_cordinates.end, variant_range_cordinates.end_range_upper_bound) return ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound + def convert_to_ci_bound(self, start_or_end_position, original_coordinate_bound): + """ Logic for converting start or end range co-ordinates to CIPOS or CIEND + :param: start or end positon: (pos or end) + :param: original_coordinate bound: start/end range lower or upper bound + :param: cipos/ciend upper or lower bound + """ + # if the original bound is unknown for the imprecise variant, pass this to CIPOS or CIEND(i.e. CIPOS=.,0) + if original_coordinate_bound == ".": + ci_coordinate_bound = "." + # if the original bound is precise variant, set to CIPOS or CIEND to None + elif original_coordinate_bound is None: + ci_coordinate_bound = None + # if known imprecise coordinate, calculate the CIPOS/CIEND + else: + ci_coordinate_bound = int(original_coordinate_bound) - start_or_end_position + return ci_coordinate_bound + + def generate_symbolic_allele(self, vcf_value_from_gvf_attribute, pos, end, length, ref, so_type): + """ Generates the symbolic allele and stores the corresponding metainformation lines. + Also determines if variant is precise or imprecise. + :return: symbolic_allele, self.info, lines_standard_ALT, lines_standard_INFO + """ + # Get symbolic allele + symbolic_allele_id = self.reference_lookup.symbolic_allele_dictionary[so_type][1] + symbolic_allele = f'<{symbolic_allele_id}>' + + lines_standard_alt = self.field_lines_dictionary["ALT"] + lines_standard_info = self.field_lines_dictionary["INFO"] + all_possible_alt_lines = self.all_possible_lines_dictionary["ALT"] + all_possible_info_lines = self.all_possible_lines_dictionary["INFO"] + + if symbolic_allele_id in all_possible_alt_lines: + lines_standard_alt.append(all_possible_alt_lines[symbolic_allele_id]) + + # Creating start/end co-ordinate ranges + (start_range_lower_bound, start_range_upper_bound, + end_range_lower_bound, end_range_upper_bound) = self.create_coordinate_range(vcf_value_from_gvf_attribute) + variant_range_coordinates = VariantRange(pos=pos, + start_range_lower_bound=start_range_lower_bound, + start_range_upper_bound=start_range_upper_bound, + end=end, + end_range_lower_bound=end_range_lower_bound, + end_range_upper_bound=end_range_upper_bound) + + # Get the dictionary of INFO keys and their values + info_dict, is_imprecise = self.generate_info_field_symbolic_allele( + variant_range_coordinates, length, ref, symbolic_allele) + + # Set SVCLAIM depending on the symbolic allele + info_svclaim_key = "SVCLAIM" + if "DEL" in symbolic_allele or "DUP" in symbolic_allele: + # TODO: IMPORTANT: this should be set to the missing place holder '.' but await clarification from the spec + info_svclaim_value = "D" + info_dict.update({info_svclaim_key: info_svclaim_value}) + lines_standard_info.append(all_possible_info_lines["SVCLAIM"]) + + # for all variants (precise and imprecise) store INFO lines for the header + lines_standard_info.append(all_possible_info_lines["END"]) + lines_standard_info.append(all_possible_info_lines["SVLEN"]) + + # for imprecise variants only + if is_imprecise: + lines_standard_info.append(all_possible_info_lines["IMPRECISE"]) + lines_standard_info.append(all_possible_info_lines["CIPOS"]) + lines_standard_info.append(all_possible_info_lines["CIEND"]) + return symbolic_allele, info_dict, lines_standard_alt, lines_standard_info + + #TODO: awaiting clarification from the specification to keep or remove this function + def has_svclaim_abundance_evidence(self, vcf_value_from_gvf_attribute, alt, info_dict): + """ This functions determines if the GVF attributes contain evidence that the SVCLAIM should be set to Abundance (D) + :params: vcf_value_from_gvf_attribute: dictionary of GVF attributes + :params: alt: alternative allele (SVCLAIM mandatory for DEL/DUP) + :params: info_dict: dictionary of INFO keys and their values + return: boolean/none + """ + abundance_variant_method_keywords = ["Array_CGH", "Read_depth", "Depth_of_coverage", "SNP_Microarray", "Oligo_Microarray"] + abundance_variant_method_keywords_lowercase = {vm.casefold() for vm in abundance_variant_method_keywords} + + abundance_variant_region_description_keywords = ["Inferred", "Copy_number_variation", "CNV", "Gain", "Loss", "Dosage_sensitive", "Relative_copy_number"] + abundance_variant_region_description_keywords_lowercase = {vrd.casefold() for vrd in abundance_variant_region_description_keywords} + + abundance_variant_call_description_keywords = ["Inferred", "Copy_number_variation", "CNV", "Gain", "Loss", "Dosage_sensitive", "Relative_copy_number"] + abundance_variant_call_description_keywords_lowercase = {vcd.casefold() for vcd in abundance_variant_call_description_keywords} + + abundance_variant_seq_keywords = ["."] + # Region + # SO:0001019 = copy_number_variation + abundance_variant_region_so_id_keywords = ["SO:0001019"] + + # Call + # SO:0001742 = copy_number_gain + # SO:0001743 = copy_number_loss + # SO:1000173 = tandem_duplication + abundance_variant_call_so_id_keywords = ["SO:0001742", "SO:0001743", "SO:1000173"] + abundance_imprecise_keywords = [True] + + if "DEL" in alt or "DUP" in alt: + has_abundance_variant_method = vcf_value_from_gvf_attribute.get("Variant_Method", "").casefold() in abundance_variant_method_keywords_lowercase + has_abundance_variant_region_description = any(i in vcf_value_from_gvf_attribute.get("variant_region_description", "").casefold() for i in abundance_variant_region_description_keywords_lowercase) + has_abundance_call_description = any(i in vcf_value_from_gvf_attribute.get("variant_call_description", "").casefold() for i in abundance_variant_call_description_keywords_lowercase) + has_abundance_variant_seq = vcf_value_from_gvf_attribute.get("Variant_seq") in abundance_variant_seq_keywords + has_abundance_variant_region_so_id_keyword = vcf_value_from_gvf_attribute.get("variant_region_so_id") in abundance_variant_region_so_id_keywords + has_abundance_variant_call_so_id_keyword = vcf_value_from_gvf_attribute.get("variant_call_so_id") in abundance_variant_call_so_id_keywords + has_abundance_imprecise = info_dict.get("IMPRECISE") in abundance_imprecise_keywords + if any([has_abundance_variant_method, has_abundance_variant_region_description,has_abundance_call_description, has_abundance_variant_seq, has_abundance_variant_region_so_id_keyword, has_abundance_variant_call_so_id_keyword, has_abundance_imprecise]): + return True + else: + return False + return None + def get_alt(self, vcf_value_from_gvf_attribute, chrom, pos, end, length, ref, so_type): """ Gets the ALT allele for the VCF file :return: symbolic_allele, self.info, lines_standard_ALT, lines_standard_INFO @@ -369,7 +415,8 @@ def get_alt(self, vcf_value_from_gvf_attribute, chrom, pos, end, length, ref, so if any(base in vcf_value_from_gvf_attribute["Variant_seq"] for base in ["A", "C", "G", "T", "N"]): alt = vcf_value_from_gvf_attribute["Variant_seq"] elif vcf_value_from_gvf_attribute["Variant_seq"] == '.': - symbolic_allele, info_dict = self.generate_symbolic_allele(vcf_value_from_gvf_attribute, pos, end, length, ref, so_type) + symbolic_allele, info_dict, lines_standard_alt, lines_standard_info = self.generate_symbolic_allele(vcf_value_from_gvf_attribute, pos, end, length, ref, so_type) + if symbolic_allele is None: alt = "." elif (vcf_value_from_gvf_attribute["Variant_seq"] == "." or vcf_value_from_gvf_attribute["Variant_seq"] == "-") and symbolic_allele is not None: @@ -385,10 +432,12 @@ def get_alt(self, vcf_value_from_gvf_attribute, chrom, pos, end, length, ref, so ref = self.check_ref(ref) else: alt = "." - print("Cannot identify symbolic allele. Variant type is not supported.") + logger.warning(f"Cannot identify symbolic allele: {symbolic_allele}. Variant type is not supported.") else: alt = "." - print("Could not determine the alternative allele.") + logger.warning("Could not determine the alternative allele.") + #TODO: awaiting clarification to keep or remove this for SVCLAIM + #is_abundance = self.has_svclaim_abundance_evidence(vcf_value_from_gvf_attribute, alt, info_dict) return pos, ref, alt, info_dict @@ -422,17 +471,18 @@ def __str__(self): """ Creates and formats the VCF line. :return: string_to_return - the VCF line as a string """ - string_to_return = '\t'.join((self.chrom, - str(self.pos), - self.id, - self.ref, - self.alt, - self.qual, - self.filter, - self.format_info_string(), - ":".join(self.format_keys), - self.combine_format_values_by_sample_as_str() - )) + fields = [self.chrom, + str(self.pos), + self.id, + self.ref, + self.alt, + self.qual, + self.filter, + self.format_info_string()] + if self.format_keys != ["."]: + fields.append(":".join(self.format_keys)) + fields.append(self.combine_format_values_by_sample_as_str()) + string_to_return = '\t'.join(fields) return string_to_return def __eq__(self, other_vcf_line): @@ -449,6 +499,8 @@ def format_keys(self): for format_value in self.vcf_values_for_format.values() for format_key in format_value.keys() ]) + if len(set_of_format_key) == 0: + set_of_format_key = set('.') return self._order_format_keys(set_of_format_key) # a list of ordered format keys def merge_and_add(self, previous_element, current_element, delimiter): @@ -586,10 +638,12 @@ def merge(self, other_vcf_line, list_of_sample_names): """ # Merging ID, ALT and FILTER first merged_id = self.merge_and_add(self.id, other_vcf_line.id, ";") + sorted_merged_id_set = sorted(set(map(int, merged_id.split(";")))) + sorted_merged_id = ";".join(map(str, sorted_merged_id_set)) merged_alt = self.merge_and_add(self.alt, other_vcf_line.alt, ",") merged_filter = self.merge_and_add(self.filter, other_vcf_line.filter, ";") - self.id = other_vcf_line.id = merged_id + self.id = other_vcf_line.id = sorted_merged_id self.alt = other_vcf_line.alt = merged_alt self.filter = other_vcf_line.filter = merged_filter # Merging INFO using info_dict diff --git a/tests/test_assisting_converter.py b/tests/test_assisting_converter.py index 15779af..6fbedcc 100644 --- a/tests/test_assisting_converter.py +++ b/tests/test_assisting_converter.py @@ -95,7 +95,7 @@ def test_convert_gvf_attributes_to_vcf_values(self): expected_gvf_attribute_dictionary = {'ID': '1', 'Name': 'nssv1412199', 'Alias': 'CNV28955', 'parent': 'nsv811094', 'Start_range': ['.', '1'], 'End_range': ['2', '.'], 'sample_name': 'Wilds2-3', 'Genotype': '0:1'} assert gvf_attribute_dictionary == expected_gvf_attribute_dictionary # testing vcf_info_values - expected_info_values = {'ID': '1', 'NAME': 'nssv1412199', 'ALIAS': 'CNV28955'} + expected_info_values = {'ID': '1', 'NAME': 'nssv1412199', 'ALIAS': 'CNV28955', 'PARENT': 'nsv811094', 'SAMPLENAME': 'Wilds2-3'} assert vcf_info_values == expected_info_values # testing vcf_format_values expected_format_values = {'Wilds2-3': {'GT': '0:1'}} diff --git a/tests/test_convert_gvf_to_vcf.py b/tests/test_convert_gvf_to_vcf.py index b4f75b0..0d23c9c 100644 --- a/tests/test_convert_gvf_to_vcf.py +++ b/tests/test_convert_gvf_to_vcf.py @@ -8,7 +8,8 @@ compare_vcf_objects, \ determine_merge_or_keep_vcf_objects, merge_vcf_objects, parse_pragma, get_pragma_name_and_value, get_pragma_tokens, \ keep_vcf_objects, get_sample_name_from_pragma, get_unique_sample_names, convert_gvf_pragmas_to_vcf_header, \ - convert_gvf_pragma_comment_to_vcf_header, generate_vcf_header_structured_lines + convert_gvf_pragma_comment_to_vcf_header, generate_vcf_header_structured_lines, get_chrom_pos_of_vcf_object, \ + has_duplicates, get_list_of_merged_vcf_objects, filter_duplicates_by_merging class TestConvertGVFtoVCF(unittest.TestCase): @@ -103,9 +104,13 @@ def test_convert_gvf_pragmas_for_vcf_header(self): # the test below relates to the VCF headerline (Part 2) def test_generate_vcf_header_line(self): - header_fields = generate_vcf_header_line(['JenMale6', 'Wilds2-3', 'Zon9', 'JenMale7']) + header_fields = generate_vcf_header_line(False, self.ordered_list_of_samples) assert header_fields == '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tJenMale6\tWilds2-3\tZon9\tJenMale7' + header_fields = generate_vcf_header_line(True, self.ordered_list_of_samples) + assert header_fields == '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO' + + # the tests below relate to the GVF header def test_parse_pragma(self): # testing: pragma has a name and value @@ -198,7 +203,9 @@ def test_merge_vcf_objects(self): assert merged_object.alt == "" assert merged_object.qual == "." assert merged_object.filter == "." - assert merged_object.info_dict == {'VARSEQ': '.', 'SVCID': 'CNV6230,CNV5711', 'END': '131', 'ALIAS': 'CNV6230,CNV5711', 'AC': '3', 'VARCALLSOID': 'SO:0001742', 'REMAP': '.69625,.85344', 'NAME': 'nssv1389474,nssv1388955', 'SVLEN': '4'} + assert len(merged_object.info_dict) == 15 + assert merged_object.info_dict["ALIAS"] == 'CNV6230,CNV5711' + assert merged_object.info_dict["NAME"] == 'nssv1389474,nssv1388955' def test_keep_vcf_objects(self): gvf_pragmas, gvf_pragma_comments, gvf_lines_obj_list = read_in_gvf_file(self.input_file) @@ -222,5 +229,57 @@ def test_determine_merge_or_keep_vcf_objects(self): assert merged_or_kept_objects[3].id == "13;14" assert merged_or_kept_objects[3].info_dict["NAME"] == "nssv1389474,nssv1388955" + def test_get_chrom_pos_of_vcf_object(self): + gvf_pragmas, gvf_pragma_comments, gvf_lines_obj_list = read_in_gvf_file(self.input_file) + header_standard_lines_dictionary, list_of_vcf_objects = convert_gvf_features_to_vcf_objects( + gvf_lines_obj_list, self.reference_lookup, self.ordered_list_of_samples + ) + obj = list_of_vcf_objects[0] + chrom_pos = get_chrom_pos_of_vcf_object(obj) + assert chrom_pos == ('chromosome1', 1) + + def test_has_duplicates(self): + gvf_pragmas, gvf_pragma_comments, gvf_lines_obj_list = read_in_gvf_file(self.input_file) + header_standard_lines_dictionary, list_of_vcf_objects = convert_gvf_features_to_vcf_objects( + gvf_lines_obj_list, self.reference_lookup, self.ordered_list_of_samples + ) + duplicate_flag, list_of_dup_chrom_pos = has_duplicates(list_of_vcf_objects) + assert duplicate_flag is True + assert list_of_dup_chrom_pos == [('chromosome1', 127), ('chromosome1', 127), ('chromosome1', 127)] + + def test_get_list_of_merged_vcf_objects(self): + gvf_pragmas, gvf_pragma_comments, gvf_lines_obj_list = read_in_gvf_file(self.input_file) + header_standard_lines_dictionary, list_of_vcf_objects = convert_gvf_features_to_vcf_objects( + gvf_lines_obj_list, self.reference_lookup, self.ordered_list_of_samples + ) + merge_or_kept_objects = get_list_of_merged_vcf_objects(list_of_vcf_objects, self.ordered_list_of_samples) + assert len(merge_or_kept_objects) == 5 + assert str(merge_or_kept_objects[0]) == "chromosome1 1 1 AC . . ID=1;NAME=nssv1412199;ALIAS=CNV28955;VARCALLSOID=SO:0001743;PARENT=nsv811094;SVCID=CNV28955;SAMPLENAME=Wilds2-3;REMAP=.98857;VARSEQ=.;END=2;IMPRECISE;SVLEN=1;SVCLAIM=D" + + def test_filter_duplicates_by_merging(self): + gvf_pragmas, gvf_pragma_comments, gvf_lines_obj_list = read_in_gvf_file(self.input_file) + header_standard_lines_dictionary, list_of_vcf_objects = convert_gvf_features_to_vcf_objects( + gvf_lines_obj_list, self.reference_lookup, self.ordered_list_of_samples + ) + # deliberately adding duplicates here + list_of_vcf_objects.append(list_of_vcf_objects[0]) + list_of_vcf_objects.append(list_of_vcf_objects[1]) + list_of_vcf_objects.append(list_of_vcf_objects[-1]) + + merge_or_kept_objects = get_list_of_merged_vcf_objects(list_of_vcf_objects, self.ordered_list_of_samples) + + list_of_vcf_objects_to_be_filtered = merge_or_kept_objects + duplicate_flag, list_of_dup_chrom_pos = has_duplicates(list_of_vcf_objects) + filtered_merge_or_kept_vcf_objects = filter_duplicates_by_merging(list_of_dup_chrom_pos,duplicate_flag, list_of_vcf_objects, list_of_vcf_objects_to_be_filtered, self.ordered_list_of_samples) + assert len(filtered_merge_or_kept_vcf_objects) <= len(merge_or_kept_objects) + assert filtered_merge_or_kept_vcf_objects[-1].chrom == "chromosome1" + assert filtered_merge_or_kept_vcf_objects[-1].pos == 127 + assert filtered_merge_or_kept_vcf_objects[-1].id == "14" + assert filtered_merge_or_kept_vcf_objects[-1].ref == "GT" + assert filtered_merge_or_kept_vcf_objects[-1].alt == "" + assert filtered_merge_or_kept_vcf_objects[-1].qual == "." + assert filtered_merge_or_kept_vcf_objects[-1].filter == "." + assert filtered_merge_or_kept_vcf_objects[-1].info_dict == {'ALIAS': 'CNV5711', 'SVCID': 'CNV5711', 'NAME': 'nssv1388955', 'REMAP': '.85344', 'DBXREF': 'mydata', 'SVLEN': '1', 'VARCALLSOID': 'SO:0001742', 'IMPRECISE': 'IMPRECISE', 'PARENT': 'nsv811095', 'SVCLAIM': 'D', 'CIPOS': '.,0', 'CIEND': '0,.', 'SAMPLENAME': 'JenMale6,JenMale7', 'VARSEQ': '.', 'AD': '3', 'END': '129', 'AC': '3'} + assert filtered_merge_or_kept_vcf_objects[-1].vcf_values_for_format == {'JenMale7': {'AD': '3', 'GT': '0:1'}} if __name__ == '__main__': unittest.main() diff --git a/tests/test_vcfline.py b/tests/test_vcfline.py index 9c8af1e..b036a1a 100644 --- a/tests/test_vcfline.py +++ b/tests/test_vcfline.py @@ -1,11 +1,9 @@ import os import unittest -from Bio.PDB import refmat - from convert_gvf_to_vcf.convertGVFtoVCF import generate_vcf_header_structured_lines from convert_gvf_to_vcf.gvffeature import GvfFeatureline -from convert_gvf_to_vcf.vcfline import VcfLine, VcfLineBuilder +from convert_gvf_to_vcf.vcfline import VcfLine, VcfLineBuilder, VariantRange from convert_gvf_to_vcf.lookup import Lookup class TestVcfLineBuilder(unittest.TestCase): @@ -60,7 +58,7 @@ def setUp(self): def test_build_vcf_line(self): # Set up GVF line object - gvf_attributes = 'ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=.' + gvf_attributes = ('ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=.') gvf_line_object = GvfFeatureline('chromosome1', 'DGVa', 'copy_number_loss', '77', '78', '.', '+', '.', gvf_attributes ) vcf_line = self.vcf_builder.build_vcf_line(gvf_line_object) assert vcf_line.chrom == 'chromosome1' @@ -87,33 +85,88 @@ def test_get_ref(self): def test_create_coordinate_range(self): # with range - vcf_value_from_gvf_attribute_with_range={'Start_range': ['.', '1'], 'End_range': ['2', '.']} - start_range_lower_bound, start_range_upper_bound, end_range_lower_bound, end_range_upper_bound = self.vcf_builder.create_coordinate_range(vcf_value_from_gvf_attribute_with_range, pos=int(1),end=int(2)) + vcf_value_from_gvf_attribute_with_range={'ID': '1', 'Name': 'nssv1412199', 'Alias': 'CNV28955', 'variant_call_so_id': 'SO:0001743', + 'parent': 'nsv811094', "Start_range":[".","776614"],"End_range":["786127","."], + 'submitter_variant_call_id': 'CNV28955', 'sample_name': 'Wilds2-3', 'remap_score': '.98857', + 'Variant_seq': '.'} + start_range_lower_bound, start_range_upper_bound, end_range_lower_bound, end_range_upper_bound = self.vcf_builder.create_coordinate_range(vcf_value_from_gvf_attribute_with_range) assert start_range_lower_bound == "." - assert start_range_upper_bound == "1" - assert end_range_lower_bound == "2" + assert start_range_upper_bound == "776614" + assert end_range_lower_bound == "786127" assert end_range_upper_bound == "." # without range - vcf_value_from_gvf_attribute_without_range={} + vcf_value_from_gvf_attribute_without_range={'ID': '1', 'Name': 'nssv1412199', 'Alias': 'CNV28955', 'variant_call_so_id': 'SO:0001743', + 'parent': 'nsv811094', + 'submitter_variant_call_id': 'CNV28955', 'sample_name': 'Wilds2-3', 'remap_score': '.98857', + 'Variant_seq': '.'} start_range_lower_bound, start_range_upper_bound, end_range_lower_bound, end_range_upper_bound = self.vcf_builder.create_coordinate_range( - vcf_value_from_gvf_attribute_with_range, pos=1, end=2) - assert start_range_lower_bound == "." - assert start_range_upper_bound == "1" - assert end_range_lower_bound == "2" - assert end_range_upper_bound == "." + vcf_value_from_gvf_attribute_without_range) + assert start_range_lower_bound is None + assert start_range_upper_bound is None + assert end_range_lower_bound is None + assert end_range_upper_bound is None + + def test_has_svclaim_abundance_evidence(self): + # full values + vcf_value_from_gvf_attribute= {'ID': '94', 'Name': 'essv7098719', 'Alias': 'TD_DGRP_795_RAL-357', + 'variant_call_so_id': 'SO:1000173', 'parent': 'esv2825690', + 'submitter_variant_call_id': 'TD_DGRP_795_RAL-357', 'sample_name': 'RAL-357', + 'Variant_seq': '.'} + alt = "" + info_dict = {'END': '118289', 'IMPRECISE': None, 'CIPOS': None, 'CIEND': None, 'SVLEN': '187'} + is_abundant = self.vcf_builder.has_svclaim_abundance_evidence(vcf_value_from_gvf_attribute, alt, info_dict) + assert is_abundant is True + #testing each value individually because if any of these values are True, the function returns True + vcf_value_from_gvf_attribute = {'Variant_Method': 'Array_CGH'} + is_abundant = self.vcf_builder.has_svclaim_abundance_evidence(vcf_value_from_gvf_attribute, alt, info_dict) + assert is_abundant == True + vcf_value_from_gvf_attribute = {'variant_region_description': 'inferred something'} + is_abundant = self.vcf_builder.has_svclaim_abundance_evidence(vcf_value_from_gvf_attribute, alt, info_dict) + assert is_abundant == True + vcf_value_from_gvf_attribute = {'variant_call_description': 'have inferred something else'} + is_abundant = self.vcf_builder.has_svclaim_abundance_evidence(vcf_value_from_gvf_attribute, alt, info_dict) + assert is_abundant == True + vcf_value_from_gvf_attribute = {'Variant_seq': '.'} + is_abundant = self.vcf_builder.has_svclaim_abundance_evidence(vcf_value_from_gvf_attribute,alt,info_dict) + assert is_abundant == True + vcf_value_from_gvf_attribute = {'variant_region_so_id': 'SO:0001019'} + is_abundant = self.vcf_builder.has_svclaim_abundance_evidence(vcf_value_from_gvf_attribute,alt,info_dict) + assert is_abundant == True + vcf_value_from_gvf_attribute = {'variant_call_so_id': 'SO:0001742'} + is_abundant = self.vcf_builder.has_svclaim_abundance_evidence(vcf_value_from_gvf_attribute,alt,info_dict) + assert is_abundant == True + info_dict = {'IMPRECISE': True} + is_abundant = self.vcf_builder.has_svclaim_abundance_evidence(vcf_value_from_gvf_attribute,alt,info_dict) + assert is_abundant == True + + def test_get_alt(self): + 'chromosome1 DGVa copy_number_loss 77 78 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=."' + vcf_value_from_gvf_attribute = {"Variant_seq":".", "Start_range":["." ,"776614"],"End_range":["786127","."]} + pos, ref, alt, info_dict = self.vcf_builder.get_alt(vcf_value_from_gvf_attribute, chrom='chromosome1', pos=77, end=78, length=1, ref='', so_type='copy_number_loss') + assert pos == 76 + assert ref == 'T' + assert alt == '' + assert info_dict == {'END': '76', 'IMPRECISE': None, 'CIPOS': None, 'CIEND': None, 'SVLEN': '1'} def test_generate_symbolic_allele(self): # TODO: This seems incorrect - vcf_value_from_gvf_attribute = {"Variant_seq":".", "Start_range":".,776614","End_range":"786127,."} + vcf_value_from_gvf_attribute = {"Variant_seq":".", "Start_range":[".","776614"],"End_range":["786127","."]} symbolic_allele, info_dict, lines_standard_alt, lines_standard_info = self.vcf_builder.generate_symbolic_allele(vcf_value_from_gvf_attribute, pos=76, end=77, length=1, ref='.', so_type='copy_number_loss') assert symbolic_allele == '' - assert info_dict == {'END': '76', 'IMPRECISE': None, 'CIPOS': None, 'CIEND': None, 'SVLEN': '1'} + assert info_dict == {'END': '77', 'IMPRECISE': 'IMPRECISE', 'CIPOS': '.,776538', 'CIEND': '786050,.', 'SVLEN': '1', 'SVCLAIM': 'D'} assert lines_standard_alt == ['##ALT='] - assert lines_standard_info== [ + assert lines_standard_info == [ + '##INFO=', '##INFO=', - '##INFO=' + '##INFO=', + '##INFO=', + '##INFO=', + '##INFO=' ] + + + def test_generate_info_field_symbolic_allele(self): end = 100 end_range_lower_bound = 100 @@ -125,11 +178,9 @@ def test_generate_info_field_symbolic_allele(self): start_range_lower_bound = 1 start_range_upper_bound = 2 symbolic_allele = "" + variant_range_coordinates= VariantRange(pos,start_range_lower_bound, start_range_upper_bound, end, end_range_lower_bound, end_range_upper_bound) info_dict, is_imprecise = self.vcf_builder.generate_info_field_symbolic_allele( - end, end_range_lower_bound, end_range_upper_bound, - length, pos, ref, - start_range_lower_bound, start_range_upper_bound, - symbolic_allele + variant_range_coordinates ,length, ref, symbolic_allele ) assert info_dict == {'END': '13', 'IMPRECISE': 'IMPRECISE', 'CIPOS': '0,1', 'CIEND': '0,100', 'SVLEN': '13'} assert is_imprecise is True @@ -145,13 +196,12 @@ def test_generate_info_field_for_imprecise_variant(self): start_range_lower_bound = 1 start_range_upper_bound = 2 symbolic_allele = "" + + variant_range_coordinates= VariantRange(pos,start_range_lower_bound, start_range_upper_bound, end, end_range_lower_bound, end_range_upper_bound) info_ciend_value, info_cipos_value, info_end_value, info_imprecise_value, is_imprecise = self.vcf_builder.generate_info_field_for_imprecise_variant( - end, end_range_lower_bound, end_range_upper_bound, - info_end_value, - length, pos, ref, - start_range_lower_bound, start_range_upper_bound, - symbolic_allele - ) + variant_range_coordinates, + length, ref, + symbolic_allele) assert info_ciend_value == "0,100" assert info_cipos_value == "0,1" assert info_end_value == "13" @@ -173,28 +223,39 @@ def test_calculate_CIPOS_and_CIEND(self): pos=1 start_range_lower_bound=1 start_range_upper_bound=10 - ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound = self.vcf_builder.calculate_CIPOS_and_CIEND(end, end_range_lower_bound, end_range_upper_bound, pos, start_range_lower_bound, start_range_upper_bound) + variant_range_coordinates= VariantRange(pos,start_range_lower_bound, start_range_upper_bound, end, end_range_lower_bound, end_range_upper_bound) + ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound = self.vcf_builder.calculate_CIPOS_and_CIEND(variant_range_coordinates) assert (ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound) == (-11, 0, 0, 9) - # unknown precise variant + # unknown imprecise variant end=122 end_range_lower_bound=122 end_range_upper_bound="." pos=1 start_range_lower_bound="." start_range_upper_bound=10 - ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound = self.vcf_builder.calculate_CIPOS_and_CIEND(end, end_range_lower_bound, end_range_upper_bound, pos, start_range_lower_bound, start_range_upper_bound) + variant_range_coordinates= VariantRange(pos,start_range_lower_bound, start_range_upper_bound, end, end_range_lower_bound, end_range_upper_bound) + ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound = self.vcf_builder.calculate_CIPOS_and_CIEND(variant_range_coordinates) assert (ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound) == (0, ".", ".", 9) - + # precise variant + end=122 + end_range_lower_bound = None + end_range_upper_bound = None + pos= 1 + start_range_lower_bound = None + start_range_upper_bound = None + variant_range_coordinates= VariantRange(pos,start_range_lower_bound, start_range_upper_bound, end, end_range_lower_bound, end_range_upper_bound) + ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound = self.vcf_builder.calculate_CIPOS_and_CIEND(variant_range_coordinates) + assert (ciend_lower_bound, ciend_upper_bound, cipos_lower_bound, cipos_upper_bound) == (None, None, None, None) def test_get_alt(self): 'chromosome1 DGVa copy_number_loss 77 78 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=."' - vcf_value_from_gvf_attribute = {"Variant_seq": ".", "Start_range": ".,776614", "End_range": "786127,."} + vcf_value_from_gvf_attribute = {"Variant_seq": ".", "Start_range": [".","776614"], "End_range": ["786127","."]} pos, ref, alt, info_dict = self.vcf_builder.get_alt(vcf_value_from_gvf_attribute, chrom='chromosome1', pos=77, end=78, length=1, ref='', so_type='copy_number_loss') assert pos == 76 assert ref == 'T' assert alt == '' - assert info_dict == {'END': '76', 'IMPRECISE': None, 'CIPOS': None, 'CIEND': None, 'SVLEN': '1'} + assert info_dict == {'END': '78', 'IMPRECISE': 'IMPRECISE', 'CIPOS': '.,776537', 'CIEND': '786049,.', 'SVLEN': '1', 'SVCLAIM': 'D'} def test_check_ref(self): assert self.vcf_builder.check_ref('A') == 'A'