From c1c38547fee6fa565d56d0b984c4e06eabd5c98c Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Wed, 8 Oct 2025 20:15:04 +0000 Subject: [PATCH 01/12] new file: bio_files_processor.py file for new functions --- bio_files_processor.py | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 bio_files_processor.py diff --git a/bio_files_processor.py b/bio_files_processor.py new file mode 100644 index 0000000..28cb40c --- /dev/null +++ b/bio_files_processor.py @@ -0,0 +1,5 @@ +def convert_multiline_fasta_to_oneline(): + pass + +def parse_blast_output(): + pass From 8ea45ce15eebbee03a5c8b8e65c6be0936bd7c92 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Thu, 9 Oct 2025 07:21:10 +0000 Subject: [PATCH 02/12] modified: main.py added functionality for file processing --- main.py | 93 +++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 68 insertions(+), 25 deletions(-) diff --git a/main.py b/main.py index 5c9a141..b99165f 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,7 @@ from typing import Union +from os import path +from sys import exit +from pathlib import Path import modules.dna_rna_tools import modules.fastq_tools @@ -55,20 +58,24 @@ def run_dna_rna_tools(*seq_data): return rez -def filter_fastq(seqs: dict, gc_bounds: Union[int, tuple] = (0, 100), length_bounds: Union[int, tuple] = (0, 2**32), quality_threshold: int = 0) -> dict: +def filter_fastq(input_fastq: str, output_fastq: str, gc_bounds: Union[int, tuple] = (0, 100), length_bounds: Union[int, tuple] = (0, 2**32), quality_threshold: int = 0) -> None: ''' A function working with fastq sequences. All bounds is included. Quality in Phred33. Input: - seqs: dict of a fastq sequences key: sequence_name string, value: tupple: (sequence: str, quality: str) + input_fastq: file with the input sequences + output_fastq: file to store filtered sequences a special directory "filtered" will be created for it Arguments: gc_bounds: tuple = (0, 100) # bound included length_bounds: tuple = (0, 2**32) # bound included quality_threshold: int = 0 # in phred33 + Intermediate: + seqs: dict of a fastq sequences key: sequence_name string, value: tupple: (sequence: str, quality: str) + Returns: dictionary consisting only of sequences that satisfy all conditions. @@ -76,29 +83,65 @@ def filter_fastq(seqs: dict, gc_bounds: Union[int, tuple] = (0, 100), length_bou exceptions if something went wrong. ''' - if not isinstance(seqs, dict): raise TypeError("seqs must be a dictionary") - if isinstance(gc_bounds, int): gc_bounds = (0, gc_bounds) - if isinstance(length_bounds, int): length_bounds = (0, length_bounds) - - rez_gc_bounds = {} - - for key in seqs.keys(): - if (gc_bounds[0] <= modules.fastq_tools.gc_count(seqs[key][0])) and (modules.fastq_tools.gc_count(seqs[key][0]) <= gc_bounds[1]): - rez_gc_bounds[key] = seqs[key] - - rez_length_bounds = {} - - for key in rez_gc_bounds.keys(): - if (length_bounds[0] <= len(seqs[key][0])) and (len(seqs[key][0]) <= length_bounds[1]): - rez_length_bounds[key] = rez_gc_bounds[key] - - rez_quality_threshold = {} - - for key in rez_length_bounds.keys(): - if (modules.fastq_tools.average_quality(seqs[key]) >= quality_threshold): - rez_quality_threshold[key] = rez_length_bounds[key] - - return rez_quality_threshold + path_to_write = Path("filtered", f"{output_fastq}") + if path_to_write.is_file(): + is_overwtire = input(f"The file {path_to_write} already exists, want to overwrite it? Y/N") + if is_overwtire in {'Y', 'y'}: path_to_write.unlink() + else: exit() + + with open(input_fastq, "r") as file: + while True: + seq_id = file.readline() + if not seq_id: + print(f"processing for {input_fastq} finished, filtration results in {path_to_write}") + break + else: + seq_id = seq_id.strip() + seq_seq = file.readline().strip() + seq_plus = file.readline().strip() + seq_qual = file.readline().strip() + seqs = {seq_id: (seq_seq, seq_qual)} + #print(seqs) + + + if not isinstance(seqs, dict): raise TypeError("seqs must be a dictionary") + if isinstance(gc_bounds, int): gc_bounds = (0, gc_bounds) + if isinstance(length_bounds, int): length_bounds = (0, length_bounds) + + rez_gc_bounds = {} + + for key in seqs.keys(): + #print(gc_bounds[0], seqs[key][0], modules.fastq_tools.gc_count(seqs[key][0])) + if (gc_bounds[0] <= modules.fastq_tools.gc_count(seqs[key][0])) and (modules.fastq_tools.gc_count(seqs[key][0]) <= gc_bounds[1]): + rez_gc_bounds[key] = seqs[key] + + rez_length_bounds = {} + + for key in rez_gc_bounds.keys(): + if (length_bounds[0] <= len(seqs[key][0])) and (len(seqs[key][0]) <= length_bounds[1]): + rez_length_bounds[key] = rez_gc_bounds[key] + + rez_quality_threshold = {} + + for key in rez_length_bounds.keys(): + if (modules.fastq_tools.average_quality(seqs[key]) >= quality_threshold): + rez_quality_threshold[key] = rez_length_bounds[key] + + output_dir = Path("filtered") + output_dir.mkdir(exist_ok=True) + + # print(rez_quality_threshold) + if len(rez_quality_threshold.keys())>0: + with path_to_write.open("a", encoding="utf-8") as file_w: + for key in rez_quality_threshold.keys(): + file_w.write(key+'\n') + file_w.write(rez_quality_threshold[key][0]+'\n') + file_w.write(seq_plus+'\n') + file_w.write(rez_quality_threshold[key][1]+'\n') + else: + continue + + return None if __name__ == "__main__": From 5202ea328ebdb88e630743d05da0efe5b8d1e075 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Thu, 9 Oct 2025 08:10:31 +0000 Subject: [PATCH 03/12] modified: modules/dna_rna_tools.py update complement function for better adequacy --- modules/dna_rna_tools.py | 45 ++++------------------------------------ 1 file changed, 4 insertions(+), 41 deletions(-) diff --git a/modules/dna_rna_tools.py b/modules/dna_rna_tools.py index e107624..384aab2 100644 --- a/modules/dna_rna_tools.py +++ b/modules/dna_rna_tools.py @@ -25,6 +25,8 @@ ValueError: if wrong sequence ''' +compliments_dna = {"A": "T", "a": "t", "T": "A", "t": "a", "G": "C", "g": "c", "C": "G", "c": "g"} +compliments_rna = {"A": "U", "a": "u", "U": "A", "u": "a", "G": "C", "g": "c", "C": "G", "c": "g"} def is_nucleic_acid(seq: str) -> bool: ''' @@ -99,49 +101,10 @@ def complement(seq: str) -> str: ''' if not is_nucleic_acid(seq): raise ValueError(f"{seq} is not DNA or RNA sqeuence!") - rez = [] - if is_dna(seq): - for n in seq: - match n: - case 'a': - rez.append('t') - case 'A': - rez.append('T') - case 't': - rez.append('a') - case 'T': - rez.append('A') - case 'c': - rez.append('g') - case 'C': - rez.append('G') - case 'g': - rez.append('c') - case 'G': - rez.append('C') - - if is_rna(seq): - for n in seq: - match n: - case 'a': - rez.append('u') - case 'A': - rez.append('U') - case 'u': - rez.append('a') - case 'U': - rez.append('A') - case 'c': - rez.append('g') - case 'C': - rez.append('G') - case 'g': - rez.append('c') - case 'G': - rez.append('C') + if is_dna(seq): return "".join([compliments_dna[n] for n in seq]) - return ''.join(rez) + if is_rna(seq): return "".join([compliments_rna[n] for n in seq]) def reverse_complement(seq: str) -> str: From 8cef00a0020effd4ed359a9a82851e24954a61fe Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Thu, 9 Oct 2025 09:34:59 +0000 Subject: [PATCH 04/12] modified: main.py modified: modules/fastq_tools.py writing filtered sequence to a file is wrapped in a function --- main.py | 14 ++++++++------ modules/fastq_tools.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/main.py b/main.py index b99165f..e627070 100644 --- a/main.py +++ b/main.py @@ -132,12 +132,14 @@ def filter_fastq(input_fastq: str, output_fastq: str, gc_bounds: Union[int, tupl # print(rez_quality_threshold) if len(rez_quality_threshold.keys())>0: - with path_to_write.open("a", encoding="utf-8") as file_w: - for key in rez_quality_threshold.keys(): - file_w.write(key+'\n') - file_w.write(rez_quality_threshold[key][0]+'\n') - file_w.write(seq_plus+'\n') - file_w.write(rez_quality_threshold[key][1]+'\n') + rez_quality_threshold['plus'] = seq_plus + modules.fastq_tools.write_seq_to_fle(path_to_write, rez_quality_threshold) + # with path_to_write.open("a", encoding="utf-8") as file_w: + # for key in rez_quality_threshold.keys(): + # file_w.write(key+'\n') + # file_w.write(rez_quality_threshold[key][0]+'\n') + # file_w.write(seq_plus+'\n') + # file_w.write(rez_quality_threshold[key][1]+'\n') else: continue diff --git a/modules/fastq_tools.py b/modules/fastq_tools.py index 7018cca..617434c 100644 --- a/modules/fastq_tools.py +++ b/modules/fastq_tools.py @@ -78,6 +78,36 @@ def average_quality(read: tuple) -> int: return round(seq_score/len(read[0])) +def read_seq_from_file(file_name: str) -> dict: + pass + + +def write_seq_to_fle(file_name: str, seq: dict, ) -> None: + ''' + Writes given seq (dict) of special shape/form to a given file + + Arguments: + seq: dict of a form {id: (seq_read, seq_quality)} + file_name: str of file name to write the dict content + + Returns: + None + in the file with file_name four rows to be writed: + id + seq_read + + + seq_quality + + + Raises: + exceptions if something went wrong + ''' + with file_name.open("a", encoding="utf-8") as file_w: + for key in seq.keys(): + file_w.write(key+'\n') + file_w.write(seq[key][0]+'\n') + file_w.write(seq['plus']+'\n') + file_w.write(seq[key][1]+'\n') if __name__ == "__main__": pass From 9b28f4b8afa7a46a56f20a5a6facfdc0517eea81 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Thu, 9 Oct 2025 09:37:12 +0000 Subject: [PATCH 05/12] modified: .gitignore added pycache and .ipynb --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 09f64ca..0dd2b90 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,8 @@ */.ipynb_checkpoints/ # jupyter notebook files +*.ipynba *.ipynb + +# __pycache__ +__pycache__ From 4fc148782de2a274c08e949135b94b927f346491 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Thu, 9 Oct 2025 11:34:07 +0000 Subject: [PATCH 06/12] modified: main.py modified: modules/fastq_tools.py reading sequence for filtering from a file is wrapped into a function --- main.py | 86 ++++++++++++++++++------------------------ modules/fastq_tools.py | 35 +++++++++++++++-- 2 files changed, 69 insertions(+), 52 deletions(-) diff --git a/main.py b/main.py index e627070..16e17fe 100644 --- a/main.py +++ b/main.py @@ -83,65 +83,53 @@ def filter_fastq(input_fastq: str, output_fastq: str, gc_bounds: Union[int, tupl exceptions if something went wrong. ''' + output_dir = Path("filtered") + output_dir.mkdir(exist_ok=True) + path_to_write = Path("filtered", f"{output_fastq}") if path_to_write.is_file(): is_overwtire = input(f"The file {path_to_write} already exists, want to overwrite it? Y/N") if is_overwtire in {'Y', 'y'}: path_to_write.unlink() else: exit() + + status = True with open(input_fastq, "r") as file: - while True: - seq_id = file.readline() - if not seq_id: - print(f"processing for {input_fastq} finished, filtration results in {path_to_write}") + while status: + seqs = modules.fastq_tools.read_seq_from_file(file) + if not status: + print(f"processing of the {input_fastq} is complete, filtering results are saved in {output_fastq}") break - else: - seq_id = seq_id.strip() - seq_seq = file.readline().strip() - seq_plus = file.readline().strip() - seq_qual = file.readline().strip() - seqs = {seq_id: (seq_seq, seq_qual)} - #print(seqs) - - - if not isinstance(seqs, dict): raise TypeError("seqs must be a dictionary") - if isinstance(gc_bounds, int): gc_bounds = (0, gc_bounds) - if isinstance(length_bounds, int): length_bounds = (0, length_bounds) - - rez_gc_bounds = {} - - for key in seqs.keys(): - #print(gc_bounds[0], seqs[key][0], modules.fastq_tools.gc_count(seqs[key][0])) - if (gc_bounds[0] <= modules.fastq_tools.gc_count(seqs[key][0])) and (modules.fastq_tools.gc_count(seqs[key][0]) <= gc_bounds[1]): - rez_gc_bounds[key] = seqs[key] - rez_length_bounds = {} - - for key in rez_gc_bounds.keys(): - if (length_bounds[0] <= len(seqs[key][0])) and (len(seqs[key][0]) <= length_bounds[1]): - rez_length_bounds[key] = rez_gc_bounds[key] - - rez_quality_threshold = {} - - for key in rez_length_bounds.keys(): - if (modules.fastq_tools.average_quality(seqs[key]) >= quality_threshold): - rez_quality_threshold[key] = rez_length_bounds[key] + if len(seqs.keys()) == 0: + print(f"processing of the {input_fastq} is complete, filtering results are saved in {output_fastq}") + break - output_dir = Path("filtered") - output_dir.mkdir(exist_ok=True) - - # print(rez_quality_threshold) - if len(rez_quality_threshold.keys())>0: - rez_quality_threshold['plus'] = seq_plus - modules.fastq_tools.write_seq_to_fle(path_to_write, rez_quality_threshold) - # with path_to_write.open("a", encoding="utf-8") as file_w: - # for key in rez_quality_threshold.keys(): - # file_w.write(key+'\n') - # file_w.write(rez_quality_threshold[key][0]+'\n') - # file_w.write(seq_plus+'\n') - # file_w.write(rez_quality_threshold[key][1]+'\n') - else: - continue + + if not isinstance(seqs, dict): raise TypeError("seqs must be a dictionary") + if isinstance(gc_bounds, int): gc_bounds = (0, gc_bounds) + if isinstance(length_bounds, int): length_bounds = (0, length_bounds) + + rez_gc_bounds = {} + + for key in seqs.keys(): + if (gc_bounds[0] <= modules.fastq_tools.gc_count(seqs[key][0])) and (modules.fastq_tools.gc_count(seqs[key][0]) <= gc_bounds[1]): + rez_gc_bounds[key] = seqs[key] + + rez_length_bounds = {} + + for key in rez_gc_bounds.keys(): + if (length_bounds[0] <= len(seqs[key][0])) and (len(seqs[key][0]) <= length_bounds[1]): + rez_length_bounds[key] = rez_gc_bounds[key] + + rez_quality_threshold = {} + + for key in rez_length_bounds.keys(): + if (modules.fastq_tools.average_quality(seqs[key]) >= quality_threshold): + rez_quality_threshold[key] = rez_length_bounds[key] + + if len(rez_quality_threshold.keys())>0: modules.fastq_tools.write_seq_to_fle(path_to_write, rez_quality_threshold) + else: continue return None diff --git a/modules/fastq_tools.py b/modules/fastq_tools.py index 617434c..6f940c6 100644 --- a/modules/fastq_tools.py +++ b/modules/fastq_tools.py @@ -13,7 +13,7 @@ ''' from . import dna_rna_tools - +from sys import exit def nucl_count(posl: str, nucl: str) -> int: ''' @@ -79,7 +79,34 @@ def average_quality(read: tuple) -> int: return round(seq_score/len(read[0])) def read_seq_from_file(file_name: str) -> dict: - pass + ''' + Get open file object, read from in a sequence result and make a dictionary of it. + Update the status of file context if needed. + + Arguments: + file_name: opened file object for reading + + Global variable: + status + + Returns: + seq: dict of a form {id: (seq_read, seq_quality, seq_plus)} + + Raises: + exceptions if something went wrong + ''' + global status + seq_id = file_name.readline() + if not seq_id: + status = False + return {} + + else: + seq_id = seq_id.strip() + seq_seq = file_name.readline().strip() + seq_plus = file_name.readline().strip() + seq_qual = file_name.readline().strip() + return {seq_id: (seq_seq, seq_qual, seq_plus)} def write_seq_to_fle(file_name: str, seq: dict, ) -> None: @@ -106,8 +133,10 @@ def write_seq_to_fle(file_name: str, seq: dict, ) -> None: for key in seq.keys(): file_w.write(key+'\n') file_w.write(seq[key][0]+'\n') - file_w.write(seq['plus']+'\n') + file_w.write(seq[key][2]+'\n') file_w.write(seq[key][1]+'\n') + return None + if __name__ == "__main__": pass From 0a736f55d47263f0961f103d51418af779221aa8 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Thu, 9 Oct 2025 18:28:11 +0000 Subject: [PATCH 07/12] modified: modules/dna_rna_tools.py update transcribe function for better adequacy --- modules/dna_rna_tools.py | 31 +++---------------------------- 1 file changed, 3 insertions(+), 28 deletions(-) diff --git a/modules/dna_rna_tools.py b/modules/dna_rna_tools.py index 384aab2..78ce6f7 100644 --- a/modules/dna_rna_tools.py +++ b/modules/dna_rna_tools.py @@ -27,6 +27,7 @@ compliments_dna = {"A": "T", "a": "t", "T": "A", "t": "a", "G": "C", "g": "c", "C": "G", "c": "g"} compliments_rna = {"A": "U", "a": "u", "U": "A", "u": "a", "G": "C", "g": "c", "C": "G", "c": "g"} +trinscribets = {"A": "A", "a": "a", "T": "U", "t": "u", "G": "G", "g": "g", "C": "C", "c": "c"} def is_nucleic_acid(seq: str) -> bool: ''' @@ -58,34 +59,8 @@ def transcribe(seq: str) -> str: ''' Return transcribed RNA from DNA sequence. ''' - if not is_dna(seq): - raise ValueError(f"{seq} is not DNA, can transcribe only DNA.") - rez = [] - for n in seq: - match n: - case 'a': - rez.append('a') - case 'A': - rez.append('A') - case 't': - rez.append('u') - case 'T': - rez.append('U') - case 'c': - rez.append('c') - case 'C': - rez.append('C') - case 'g': - rez.append('g') - case 'G': - rez.append('G') - case 'u': - rez.append('t') - case 'U': - rez.append('T') - case _: - return "not dna or rna sequence" - return ''.join(rez) + if not is_dna(seq): raise ValueError(f"{seq} is not DNA, can transcribe only DNA.") + else: return "".join([trinscribets[n] for n in seq]) def reverse(seq: str) -> str: From 7fd1088cdde3b73a7b7a3171df90063786a65212 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Fri, 10 Oct 2025 18:09:41 +0000 Subject: [PATCH 08/12] modified: bio_files_processor.py added convert_multiline_fasta_to_oneline --- bio_files_processor.py | 77 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 3 deletions(-) diff --git a/bio_files_processor.py b/bio_files_processor.py index 28cb40c..182e886 100644 --- a/bio_files_processor.py +++ b/bio_files_processor.py @@ -1,5 +1,76 @@ -def convert_multiline_fasta_to_oneline(): - pass +''' + Functions for reading bioinformatics files. + + convert_multiline_fasta_to_oneline: reads a fasta file supplied as input, + in which the sequence (DNA/RNA/protein/etc.) can be split into several lines, + and then saves it into a new fasta file in which each sequence fits one line. + + parse_blast_output: function reads the specified TXT file, + and for each QUERY request (paragraph "Sequences producing significant alignments:"), + selects the first row from the Description column. + The resulting set of proteins is saved to a new file in a single column, sorted alphabetically. + +''' + +from pathlib import Path + +def convert_multiline_fasta_to_oneline(input_fasta: str) -> None: + ''' + A function processes fasta file: reads a fasta file supplied as input_fasta (str), + where one sequence (DNA/RNA/protein/etc.) can be split into several lines, + and then saves it into a new fasta file (output_fasta) where each sequence fits one line. + + Input: + input_fasta: file with the input sequences + + Arguments: + input_fasta: str + + Returns: + output_fasta: a file where each sequence fits one line + + Raises: + exceptions if something went wrong. + ''' + + path_to_write = Path(f"output_{input_fasta}") + + with open(input_fasta, "r") as file: + rez = {} + line = file.readline() + while line: + if line and line[0] == '>': + key_line = line.strip() + rez[key_line] = [] + n_line = file.readline().strip() + while n_line and n_line[0] != '>': + if n_line: + rez[key_line].append(n_line) + n_line = file.readline().strip() + line = n_line + else: + line = file.readline().strip() + + with open(path_to_write, "w") as file_w: + for k in rez.keys(): + file_w.write(k+'\n') + file_w.write(''.join(rez[k])+'\n') def parse_blast_output(): - pass + ''' + A function processes fasta file: reads a fasta file supplied as input_fasta (str), + where one sequence (DNA/RNA/protein/...) can be split into several lines, + and then saves it into a new fasta file (output_fasta) where each sequence fits one line. + + Input: + input_fasta: file with the input sequences + + Arguments: + input_fasta: str + + Returns: + output_fasta: a file where each sequence fits one line + + Raises: + exceptions if something went wrong. + ''' From c3eb0b9c957d76155f61e26a2217fd494cc3bc58 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Sun, 12 Oct 2025 20:33:52 +0000 Subject: [PATCH 09/12] modified: README.md added info about bio_files_processor.py and its functions --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index 852b89a..4340a2f 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,18 @@ fastq_dict = { filter_fastq(fastq_dict, 100, 9, 37) ``` +### bio_files_processor + +#### convert_multiline_fasta_to_oneline: + reads a fasta file supplied as input, + in which the sequence (DNA/RNA/protein/etc.) can be split into several lines, + and then saves it into a new fasta file in which each sequence fits one line. + +#### select_genes_from_gbk_to_fasta: + function receives a GBK-file as input, + extracts the specified number of genes before and after each gene of interest (gene), + and saves their protein sequence (translation) to a fasta file. + ### Error Handling Functions raise appropriate exceptions for invalid inputs. From cdcc6a9a86d153851cde793f629518fc9970cd94 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Sun, 12 Oct 2025 20:35:09 +0000 Subject: [PATCH 10/12] modified: modules/fastq_tools.py added functions for bio_files_processor: write_genes_seq_to_fasta and find_genes_with_neighbors --- modules/fastq_tools.py | 64 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/modules/fastq_tools.py b/modules/fastq_tools.py index 6f940c6..afd9ab7 100644 --- a/modules/fastq_tools.py +++ b/modules/fastq_tools.py @@ -14,6 +14,7 @@ from . import dna_rna_tools from sys import exit +from typing import Union def nucl_count(posl: str, nucl: str) -> int: ''' @@ -138,5 +139,68 @@ def write_seq_to_fle(file_name: str, seq: dict, ) -> None: return None +def write_genes_seq_to_fasta(genes_data: dict, output_file: str): + """ + Writes genes to a FASTA file. + + Arguments: + genes_data: dictionary with gene data + output_file: name of the output FASTA file + + Returns: + None + + """ + + with open(output_file, 'w', encoding='utf-8') as file: + for gene_num, gene_info in genes_data.items(): + header = f">gene_{gene_num}" + if gene_info['gene']: + header += f"|name_{gene_info['gene']}" + + file.write(header + "\n") + file.write(gene_info['translation'] + "\n\n") + + return None + +def find_genes_with_neighbors(genes_dict: dict, genes: Union[int, tuple, list], n_before: int = 1, n_after: int = 1) -> dict: + """ + Finds genes of interest and their neighbors in a dictionary. + + Arguments: + genes_dict: dictionary with genes {n: {'gene': name, 'translation': seq}} + genes: gene numbers to search (int, tuple, or list) + n_before: how many genes before the target gene to include + n_after: how many genes after the target gene to include + + Returns: + dict: dictionary with found genes and their neighbors + + """ + rez = {} + + if isinstance(genes, int): + target_genes = [genes] + elif isinstance(genes, tuple): + target_genes = list(genes) + else: + target_genes = genes + + genes_to_include = set() + + for gene_num in target_genes: + if gene_num in genes_dict: + start = max(1, gene_num - n_before) + end = gene_num + n_after + + for i in range(start, end + 1): + if i in genes_dict: + genes_to_include.add(i) + + for gene_num in sorted(genes_to_include): + rez[gene_num] = genes_dict[gene_num] + + return rez + if __name__ == "__main__": pass From a8cf30a48376ee1d5f1485861ebac5d642415014 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Sun, 12 Oct 2025 20:54:49 +0000 Subject: [PATCH 11/12] modified: bio_files_processor.py added parse_blast_output --- bio_files_processor.py | 128 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 116 insertions(+), 12 deletions(-) diff --git a/bio_files_processor.py b/bio_files_processor.py index 182e886..f4fce48 100644 --- a/bio_files_processor.py +++ b/bio_files_processor.py @@ -5,14 +5,16 @@ in which the sequence (DNA/RNA/protein/etc.) can be split into several lines, and then saves it into a new fasta file in which each sequence fits one line. - parse_blast_output: function reads the specified TXT file, - and for each QUERY request (paragraph "Sequences producing significant alignments:"), - selects the first row from the Description column. - The resulting set of proteins is saved to a new file in a single column, sorted alphabetically. + select_genes_from_gbk_to_fasta: function receives a GBK-file as input, + extracts the specified number of genes before and after each gene of interest (gene), + and saves their protein sequence (translation) to a fasta file. ''' from pathlib import Path +from typing import Union +import json +import modules.fastq_tools def convert_multiline_fasta_to_oneline(input_fasta: str) -> None: ''' @@ -56,17 +58,16 @@ def convert_multiline_fasta_to_oneline(input_fasta: str) -> None: file_w.write(k+'\n') file_w.write(''.join(rez[k])+'\n') -def parse_blast_output(): +def parse_blast_output(input_gbk: str, genes: Union[int, tuple, list], output_fasta: str, n_before: int = 1, n_after: int = 1): ''' - A function processes fasta file: reads a fasta file supplied as input_fasta (str), - where one sequence (DNA/RNA/protein/...) can be split into several lines, - and then saves it into a new fasta file (output_fasta) where each sequence fits one line. - - Input: - input_fasta: file with the input sequences + Receives a GBK-file as input, extracts the specified number of genes before and after each gene of interest (gene), + and saves their protein sequence (translation) to a fasta file. Arguments: - input_fasta: str + input_gbk: path to the input GBK file + genes: genes of interest, near which neighbors are searched (string or collection of strings). + n_before, n_after: number of genes before and after (>=1) + output_fasta: output file name. Returns: output_fasta: a file where each sequence fits one line @@ -74,3 +75,106 @@ def parse_blast_output(): Raises: exceptions if something went wrong. ''' + + genes_parsed = {} + gene_count = 0 + current_gene = None + current_translation = "" + in_translation = False + in_cds = False + path_to_write = Path(f"output_{input_gbk.split('.')[0]}") + + with open(input_gbk, 'r', encoding='utf-8') as file: + for line in file: + line = line.rstrip() + + if line.startswith(' CDS'): + if in_cds and current_gene is not None and current_translation: + gene_count += 1 + genes_parsed[gene_count] = { + 'gene': current_gene, + 'translation': current_translation + } + + in_cds = True + current_gene = None + current_translation = "" + in_translation = False + continue + + if in_cds: + if line and not line.startswith(' '): + if current_gene is not None and current_translation: + gene_count += 1 + genes_parsed[gene_count] = { + 'gene': current_gene, + 'translation': current_translation + } + + in_cds = line.startswith(' CDS') + current_gene = None + current_translation = "" + in_translation = False + if in_cds: + continue + + if '/gene=' in line and current_gene is None: + gene_part = line.split('/gene=')[1].strip() + if gene_part.startswith('"'): + current_gene = gene_part.split('"')[1] + else: + current_gene = gene_part.split()[0].strip('"') + + elif '/translation=' in line: + in_translation = True + translation_part = line.split('/translation=')[1].strip() + if translation_part.startswith('"'): + translation_part = translation_part[1:] + if translation_part.endswith('"'): + translation_part = translation_part[:-1] + in_translation = False + + current_translation = translation_part + + elif in_translation: + clean_line = line.strip() + + if '"' in clean_line: + clean_line = clean_line.split('"')[0] + in_translation = False + + if clean_line.startswith('/'): + in_translation = False + else: + current_translation += clean_line + + if in_cds and current_gene is not None and current_translation: + gene_count += 1 + genes_parsed[gene_count] = { + 'gene': current_gene, + 'translation': current_translation + } + + #print(genes_parsed) + + # For future use: + # to avoid new-parsing the if the data is already collected in a convenient form (dictionary) + with open(f'{input_gbk.split(".")[0]}.json', 'w', encoding='utf-8') as f: + json.dump(genes_parsed, f, ensure_ascii=False, indent=4) + + genes_of_interests = modules.fastq_tools.find_genes_with_neighbors(genes_parsed, genes, n_before, n_after) + + modules.fastq_tools.write_genes_seq_to_fasta(genes_of_interests, output_fasta) + + return None + + + + + + + + + + + \ No newline at end of file From 33345a36d2a7b92e2cd834633abf2093060b7e80 Mon Sep 17 00:00:00 2001 From: Alex K <777ak888@gmail.com> Date: Wed, 22 Oct 2025 11:23:10 +0000 Subject: [PATCH 12/12] modified: bio_files_processor.py modified: modules/fastq_tools.py bug fixed in output file creation --- bio_files_processor.py | 13 +++++++++---- modules/fastq_tools.py | 40 +++++++++++++++++----------------------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/bio_files_processor.py b/bio_files_processor.py index f4fce48..851c36b 100644 --- a/bio_files_processor.py +++ b/bio_files_processor.py @@ -91,7 +91,8 @@ def parse_blast_output(input_gbk: str, genes: Union[int, tuple, list], output_fa if line.startswith(' CDS'): if in_cds and current_gene is not None and current_translation: gene_count += 1 - genes_parsed[gene_count] = { + genes_parsed[current_gene] = { + 'gene_count': gene_count, 'gene': current_gene, 'translation': current_translation } @@ -106,7 +107,8 @@ def parse_blast_output(input_gbk: str, genes: Union[int, tuple, list], output_fa if line and not line.startswith(' '): if current_gene is not None and current_translation: gene_count += 1 - genes_parsed[gene_count] = { + genes_parsed[current_gene] = { + 'gene_count': gene_count, 'gene': current_gene, 'translation': current_translation } @@ -150,7 +152,8 @@ def parse_blast_output(input_gbk: str, genes: Union[int, tuple, list], output_fa if in_cds and current_gene is not None and current_translation: gene_count += 1 - genes_parsed[gene_count] = { + genes_parsed[current_gene] = { + 'gene_count': gene_count, 'gene': current_gene, 'translation': current_translation } @@ -162,9 +165,11 @@ def parse_blast_output(input_gbk: str, genes: Union[int, tuple, list], output_fa with open(f'{input_gbk.split(".")[0]}.json', 'w', encoding='utf-8') as f: json.dump(genes_parsed, f, ensure_ascii=False, indent=4) + print(f'saved human readable version of {input_gbk} in {input_gbk.split(".")[0]}.json') + genes_of_interests = modules.fastq_tools.find_genes_with_neighbors(genes_parsed, genes, n_before, n_after) - modules.fastq_tools.write_genes_seq_to_fasta(genes_of_interests, output_fasta) + modules.fastq_tools.write_genes_seq_to_fasta(genes_of_interests, path_to_write) return None diff --git a/modules/fastq_tools.py b/modules/fastq_tools.py index afd9ab7..786a869 100644 --- a/modules/fastq_tools.py +++ b/modules/fastq_tools.py @@ -163,12 +163,12 @@ def write_genes_seq_to_fasta(genes_data: dict, output_file: str): return None -def find_genes_with_neighbors(genes_dict: dict, genes: Union[int, tuple, list], n_before: int = 1, n_after: int = 1) -> dict: +def find_genes_with_neighbors(genes_all: dict, genes: Union[int, tuple, list], n_before: int = 1, n_after: int = 1) -> dict: """ Finds genes of interest and their neighbors in a dictionary. Arguments: - genes_dict: dictionary with genes {n: {'gene': name, 'translation': seq}} + genes_dict: dictionary with genes {'gene_name': {'gene': gene_name, 'gene_count': number, 'translation': seq}} genes: gene numbers to search (int, tuple, or list) n_before: how many genes before the target gene to include n_after: how many genes after the target gene to include @@ -177,29 +177,23 @@ def find_genes_with_neighbors(genes_dict: dict, genes: Union[int, tuple, list], dict: dictionary with found genes and their neighbors """ - rez = {} - - if isinstance(genes, int): - target_genes = [genes] - elif isinstance(genes, tuple): - target_genes = list(genes) - else: - target_genes = genes - - genes_to_include = set() - - for gene_num in target_genes: - if gene_num in genes_dict: - start = max(1, gene_num - n_before) - end = gene_num + n_after - - for i in range(start, end + 1): - if i in genes_dict: - genes_to_include.add(i) + + if isinstance(genes, str): + genes = [genes] - for gene_num in sorted(genes_to_include): - rez[gene_num] = genes_dict[gene_num] + rez = {} + genes_to_check = list(genes_all.keys()) + for gene in genes: + idx = genes_to_check.index(gene) + + left_end = max(idx - n_before, 0) + right_end = min(idx + n_after + 1, len(genes_to_check)) + + for i in range(left_end, right_end): + key = genes_to_check[i] + rez[key] = genes_all[key] + return rez if __name__ == "__main__":