-
Notifications
You must be signed in to change notification settings - Fork 2
Iterate #27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Iterate #27
Changes from all commits
f73f0c6
4e11667
10a54e4
0db4b68
9575e68
d0f3f52
7ca6d01
26d8d65
6d1b546
00289e0
7ec2990
4be6a50
942da4e
eff50f0
ee08a3e
6721b64
e48e151
57cd7c0
10c5277
c514eb1
6123fde
085ae47
6634f7f
0de41c4
2841ab5
cc70045
2d08696
5f1dd11
1390892
3d186cb
7949312
7fd761d
79254f4
94eec07
bb36a06
ed605ba
dd3f5c5
9333761
3da35ac
5170743
7eea89b
f78ad96
33dc9ff
3e40277
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -166,15 +166,21 @@ def convert_gvf_pragmas_for_vcf_header(gvf_pragmas, | |
| return unique_converted_pragmas, unique_sample_name | ||
|
|
||
| # the function below relates to the VCF headerline (Part 2) | ||
| def generate_vcf_header_line(samples): | ||
| def generate_vcf_header_line(is_missing_format, samples): | ||
| """ Generates the VCF header line using the nine mandatory headers and the sample names. | ||
| :param is_missing_format: boolean (False = Format + sample names, True = mandatory fields only) | ||
| :param samples: list of samples, these will appear in the header line | ||
| :return: vcf_header: a string | ||
| """ | ||
| vcf_header_fields = ["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT"] | ||
| for sample in samples: | ||
| vcf_header_fields.append(sample) | ||
| vcf_header = '\t'.join(vcf_header_fields) | ||
| if is_missing_format: | ||
| vcf_header_fields = ["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO"] | ||
| vcf_header = '\t'.join(vcf_header_fields) | ||
| else: | ||
|
|
||
| vcf_header_fields = ["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT"] | ||
| for sample in samples: | ||
| vcf_header_fields.append(sample) | ||
| vcf_header = '\t'.join(vcf_header_fields) | ||
| return vcf_header | ||
|
|
||
| # the functions below relate to the GVF header | ||
|
|
@@ -259,6 +265,22 @@ def convert_gvf_features_to_vcf_objects(gvf_lines_obj_list, reference_lookup, or | |
| return standard_header_lines, list_of_vcf_objects | ||
|
|
||
| # The functions below relate to the VCF objects | ||
| def collect_missing_format_flags(list_of_vcf_objects): | ||
| """ Returns a list of booleans for each VCF object (True if format is missing, False if format keys present) | ||
| :params: list_of_vcf_objects: list of vcf objects | ||
| :return: missing_format_flags: list of booleans (True if format is missing, False if format keys present) | ||
| """ | ||
| missing_format_flags = [] | ||
| for index in range(1, len(list_of_vcf_objects)): | ||
| if list_of_vcf_objects[index].format_keys == ['.']: | ||
| format_flag = True | ||
| missing_format_flags.append(format_flag) | ||
| else: | ||
| format_flag = False | ||
| missing_format_flags.append(format_flag) | ||
| return missing_format_flags | ||
|
|
||
|
|
||
| def compare_vcf_objects(list_of_vcf_objects): | ||
| """ Compares VCF objects in the list with the VCF object before it. Returns boolean values. | ||
| :params: list_of_vcf_objects: list of vcf objects | ||
|
|
@@ -314,6 +336,69 @@ def determine_merge_or_keep_vcf_objects(list_of_vcf_objects, comparison_results, | |
| merge_or_kept_objects.append(list_of_vcf_objects[-1]) | ||
| return merge_or_kept_objects | ||
|
|
||
| def get_chrom_pos_of_vcf_object(obj): | ||
| """ | ||
| Returns chromosome and position of an object. Used to help sort VCF object by chromosome name and by numeric position. | ||
| :params: obj : vcf line object | ||
| :return: obj.chrom, obj.pos | ||
| """ | ||
| return (obj.chrom, int(obj.pos)) | ||
|
|
||
| def has_duplicates(list_of_objects): | ||
| """ Checks chrom and pos of vcf line objects to see if there are duplicates. If list of vcf object has duplicates, merge again. | ||
| :params: list_of_objects | ||
| :return: duplicate_flag - boolean value (True = has duplicates so merge again, False= no duplicates) | ||
| """ | ||
| duplicate_flag = False | ||
| list_of_duplicate_chrom_pos = [] | ||
| seen_chrom_pos = set() | ||
| for obj in list_of_objects: | ||
| chrom_pos = (obj.chrom, int(obj.pos)) | ||
| if chrom_pos in seen_chrom_pos: | ||
| # returns true to allow to merge again | ||
| duplicate_flag = True | ||
| list_of_duplicate_chrom_pos.append(chrom_pos) | ||
| else: | ||
| seen_chrom_pos.add(chrom_pos) | ||
| return duplicate_flag, list_of_duplicate_chrom_pos | ||
|
|
||
|
|
||
| def get_list_of_merged_vcf_objects(list_of_vcf_objects, samples): | ||
| """ Compares VCF objects, merges VCF objects, sorts VCF objects. This gives a list of sorted merged vcf objects. | ||
| :params: list_of_vcf_objects | ||
| :params: samples | ||
| :returns: merge_or_kept_vcf_objects | ||
| """ | ||
| comparison_flags = compare_vcf_objects(list_of_vcf_objects) # Identifies which VCF objects to merge | ||
| merge_or_kept_vcf_objects = determine_merge_or_keep_vcf_objects(list_of_vcf_objects, comparison_flags, samples) | ||
| merge_or_kept_vcf_objects.sort(key=get_chrom_pos_of_vcf_object) # sorting by chromosome and position | ||
| return merge_or_kept_vcf_objects | ||
|
|
||
| def filter_duplicates_by_merging(chrom_pos_list, has_dups, list_of_vcf_objects, | ||
| list_of_vcf_objects_to_be_filtered, samples): | ||
| """ | ||
| :params: chrom_pos_list: list of tuples - this represents duplicate positions | ||
| :params: has_dups: boolean - True if it contains duplicates | ||
| :params: list_of_vcf_objects: list of VCF objects (GVF converted to VCF; with no merging/remove dups) | ||
| :params: list_of_vcf_objects_to_be_filtered: list of VCF objects from a previous merge | ||
| :params: samples: names | ||
| :returns: filtered_merge_or_kept_vcf_objects - list of vcf objects with duplicates from this iteration removed | ||
| """ | ||
| if has_dups: | ||
| for chrom_pos in chrom_pos_list: | ||
| chrom_to_search = chrom_pos[0] | ||
| pos_to_search = chrom_pos[1] | ||
| vcf_objects_to_merge = [] | ||
| for vcf_object in list_of_vcf_objects: | ||
| if vcf_object.chrom == chrom_to_search and vcf_object.pos == pos_to_search: | ||
| vcf_objects_to_merge.append(vcf_object) | ||
|
|
||
| merge_duplicates = get_list_of_merged_vcf_objects(vcf_objects_to_merge, samples) | ||
| filtered_merge_or_kept_vcf_objects = [x for x in list_of_vcf_objects_to_be_filtered if x not in vcf_objects_to_merge] | ||
| filtered_merge_or_kept_vcf_objects.extend(merge_duplicates) | ||
| filtered_merge_or_kept_vcf_objects.sort(key=get_chrom_pos_of_vcf_object) | ||
| return filtered_merge_or_kept_vcf_objects | ||
|
|
||
| def main(): | ||
| # Parse command line arguments | ||
| parser = argparse.ArgumentParser() | ||
|
|
@@ -375,26 +460,51 @@ def main(): | |
| for header_line in header_type: | ||
| vcf_output.write(f"{header_line}\n") | ||
|
|
||
| # Part 2 of VCF file: Write the VCF header line. This is the nine mandatory fields with its sample names. | ||
| header_fields = generate_vcf_header_line(samples) | ||
| # Part 2 of VCF file: Write the VCF header line. | ||
| # Determine if the header is the 8 mandatory fields or 8 mandatory fields + FORMAT + sample names. | ||
| missing_flags = collect_missing_format_flags(list_of_vcf_objects) # True if format keys are missing, False if present | ||
| if all(missing_flags): | ||
| is_missing_format_value = True | ||
| logger.info("No Format Keys detected. Printing mandatory VCF headers.") | ||
| else: | ||
| is_missing_format_value = False | ||
| # Write the header. | ||
| header_fields = generate_vcf_header_line(is_missing_format=is_missing_format_value, samples=samples) | ||
| vcf_output.write(f"{header_fields}\n") | ||
|
|
||
| # Part 3 of VCF file: Write the VCF data lines. This will contain info about the position in the genome, | ||
| # its variants and genotype information per sample. | ||
| if len(gvf_lines_obj_list) > 0: | ||
| if (list_of_vcf_objects): | ||
| logger.info("Generating the VCF datalines") | ||
| # Each GVF feature has been converted to a VCF object so begin comparing and merging the VCF objects. | ||
| comparison_flags = compare_vcf_objects(list_of_vcf_objects) # Identifies which VCF objects to merge | ||
| merge_or_kept_vcf_objects = determine_merge_or_keep_vcf_objects(list_of_vcf_objects, comparison_flags, samples) | ||
| # initial merge | ||
| merge_or_kept_vcf_objects = get_list_of_merged_vcf_objects(list_of_vcf_objects, samples) | ||
| # identify if duplicates are present after merging | ||
| has_dups, chrom_pos_list = has_duplicates(merge_or_kept_vcf_objects) | ||
| # while duplicates are present, merge, then re-check for dups | ||
| max_iterations = 100 | ||
| iteration = 0 | ||
| list_of_vcf_objects_to_be_filtered = merge_or_kept_vcf_objects | ||
| while has_dups and iteration < max_iterations: | ||
| filtered_merge_or_kept_vcf_objects = filter_duplicates_by_merging(chrom_pos_list, has_dups, | ||
| list_of_vcf_objects, | ||
| list_of_vcf_objects_to_be_filtered, samples) | ||
| has_dups, chrom_pos_list = has_duplicates(filtered_merge_or_kept_vcf_objects) | ||
| iteration += 1 | ||
| list_of_vcf_objects_to_be_filtered = filtered_merge_or_kept_vcf_objects | ||
| logger.info(f"Iteration of merge (remove dups): {iteration}") | ||
|
Comment on lines
+481
to
+495
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the reason why the merging algorithm not capable of removing all the merge in one pass ?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The merge function compares the current line with the previous line (it is limited to 2 lines in its comparison and merge). For example:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ok so now that you have identified the limitation you should work removing it rather engineer something around.
|
||
|
|
||
| # Write the VCF objects as data lines in the VCF file. | ||
| for vcf_line_object in merge_or_kept_vcf_objects: | ||
| vcf_output.write(str(vcf_line_object) + "\n") | ||
| # vcf_output.write("\t".join(str(val) for val in line) + "\n") | ||
| if iteration != 0: | ||
| for vcf_line_object in filtered_merge_or_kept_vcf_objects: | ||
| vcf_output.write(str(vcf_line_object) + "\n") | ||
| else: | ||
| for vcf_line_object in merge_or_kept_vcf_objects: | ||
| vcf_output.write(str(vcf_line_object) + "\n") | ||
| else: | ||
| logger.warning("No feature lines were found for this GVF file.") | ||
| vcf_output.close() | ||
| logger.info("GVF to VCF conversion complete") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
Uh oh!
There was an error while loading. Please reload this page.