diff --git a/.gitignore b/.gitignore index 09f64ca..0dd2b90 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,8 @@ */.ipynb_checkpoints/ # jupyter notebook files +*.ipynba *.ipynb + +# __pycache__ +__pycache__ diff --git a/README.md b/README.md index 852b89a..4340a2f 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,18 @@ fastq_dict = { filter_fastq(fastq_dict, 100, 9, 37) ``` +### bio_files_processor + +#### convert_multiline_fasta_to_oneline: + reads a fasta file supplied as input, + in which the sequence (DNA/RNA/protein/etc.) can be split into several lines, + and then saves it into a new fasta file in which each sequence fits one line. + +#### select_genes_from_gbk_to_fasta: + function receives a GBK-file as input, + extracts the specified number of genes before and after each gene of interest (gene), + and saves their protein sequence (translation) to a fasta file. + ### Error Handling Functions raise appropriate exceptions for invalid inputs. diff --git a/bio_files_processor.py b/bio_files_processor.py new file mode 100644 index 0000000..851c36b --- /dev/null +++ b/bio_files_processor.py @@ -0,0 +1,185 @@ +''' + Functions for reading bioinformatics files. + + convert_multiline_fasta_to_oneline: reads a fasta file supplied as input, + in which the sequence (DNA/RNA/protein/etc.) can be split into several lines, + and then saves it into a new fasta file in which each sequence fits one line. + + select_genes_from_gbk_to_fasta: function receives a GBK-file as input, + extracts the specified number of genes before and after each gene of interest (gene), + and saves their protein sequence (translation) to a fasta file. + +''' + +from pathlib import Path +from typing import Union +import json +import modules.fastq_tools + +def convert_multiline_fasta_to_oneline(input_fasta: str) -> None: + ''' + A function processes fasta file: reads a fasta file supplied as input_fasta (str), + where one sequence (DNA/RNA/protein/etc.) can be split into several lines, + and then saves it into a new fasta file (output_fasta) where each sequence fits one line. + + Input: + input_fasta: file with the input sequences + + Arguments: + input_fasta: str + + Returns: + output_fasta: a file where each sequence fits one line + + Raises: + exceptions if something went wrong. + ''' + + path_to_write = Path(f"output_{input_fasta}") + + with open(input_fasta, "r") as file: + rez = {} + line = file.readline() + while line: + if line and line[0] == '>': + key_line = line.strip() + rez[key_line] = [] + n_line = file.readline().strip() + while n_line and n_line[0] != '>': + if n_line: + rez[key_line].append(n_line) + n_line = file.readline().strip() + line = n_line + else: + line = file.readline().strip() + + with open(path_to_write, "w") as file_w: + for k in rez.keys(): + file_w.write(k+'\n') + file_w.write(''.join(rez[k])+'\n') + +def parse_blast_output(input_gbk: str, genes: Union[int, tuple, list], output_fasta: str, n_before: int = 1, n_after: int = 1): + ''' + Receives a GBK-file as input, extracts the specified number of genes before and after each gene of interest (gene), + and saves their protein sequence (translation) to a fasta file. + + Arguments: + input_gbk: path to the input GBK file + genes: genes of interest, near which neighbors are searched (string or collection of strings). + n_before, n_after: number of genes before and after (>=1) + output_fasta: output file name. + + Returns: + output_fasta: a file where each sequence fits one line + + Raises: + exceptions if something went wrong. + ''' + + genes_parsed = {} + gene_count = 0 + current_gene = None + current_translation = "" + in_translation = False + in_cds = False + path_to_write = Path(f"output_{input_gbk.split('.')[0]}") + + with open(input_gbk, 'r', encoding='utf-8') as file: + for line in file: + line = line.rstrip() + + if line.startswith(' CDS'): + if in_cds and current_gene is not None and current_translation: + gene_count += 1 + genes_parsed[current_gene] = { + 'gene_count': gene_count, + 'gene': current_gene, + 'translation': current_translation + } + + in_cds = True + current_gene = None + current_translation = "" + in_translation = False + continue + + if in_cds: + if line and not line.startswith(' '): + if current_gene is not None and current_translation: + gene_count += 1 + genes_parsed[current_gene] = { + 'gene_count': gene_count, + 'gene': current_gene, + 'translation': current_translation + } + + in_cds = line.startswith(' CDS') + current_gene = None + current_translation = "" + in_translation = False + if in_cds: + continue + + if '/gene=' in line and current_gene is None: + gene_part = line.split('/gene=')[1].strip() + if gene_part.startswith('"'): + current_gene = gene_part.split('"')[1] + else: + current_gene = gene_part.split()[0].strip('"') + + elif '/translation=' in line: + in_translation = True + translation_part = line.split('/translation=')[1].strip() + if translation_part.startswith('"'): + translation_part = translation_part[1:] + if translation_part.endswith('"'): + translation_part = translation_part[:-1] + in_translation = False + + current_translation = translation_part + + elif in_translation: + clean_line = line.strip() + + if '"' in clean_line: + clean_line = clean_line.split('"')[0] + in_translation = False + + if clean_line.startswith('/'): + in_translation = False + else: + current_translation += clean_line + + if in_cds and current_gene is not None and current_translation: + gene_count += 1 + genes_parsed[current_gene] = { + 'gene_count': gene_count, + 'gene': current_gene, + 'translation': current_translation + } + + #print(genes_parsed) + + # For future use: + # to avoid new-parsing the if the data is already collected in a convenient form (dictionary) + with open(f'{input_gbk.split(".")[0]}.json', 'w', encoding='utf-8') as f: + json.dump(genes_parsed, f, ensure_ascii=False, indent=4) + + print(f'saved human readable version of {input_gbk} in {input_gbk.split(".")[0]}.json') + + genes_of_interests = modules.fastq_tools.find_genes_with_neighbors(genes_parsed, genes, n_before, n_after) + + modules.fastq_tools.write_genes_seq_to_fasta(genes_of_interests, path_to_write) + + return None + + + + + + + + + + + \ No newline at end of file diff --git a/main.py b/main.py index 5c9a141..16e17fe 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,7 @@ from typing import Union +from os import path +from sys import exit +from pathlib import Path import modules.dna_rna_tools import modules.fastq_tools @@ -55,20 +58,24 @@ def run_dna_rna_tools(*seq_data): return rez -def filter_fastq(seqs: dict, gc_bounds: Union[int, tuple] = (0, 100), length_bounds: Union[int, tuple] = (0, 2**32), quality_threshold: int = 0) -> dict: +def filter_fastq(input_fastq: str, output_fastq: str, gc_bounds: Union[int, tuple] = (0, 100), length_bounds: Union[int, tuple] = (0, 2**32), quality_threshold: int = 0) -> None: ''' A function working with fastq sequences. All bounds is included. Quality in Phred33. Input: - seqs: dict of a fastq sequences key: sequence_name string, value: tupple: (sequence: str, quality: str) + input_fastq: file with the input sequences + output_fastq: file to store filtered sequences a special directory "filtered" will be created for it Arguments: gc_bounds: tuple = (0, 100) # bound included length_bounds: tuple = (0, 2**32) # bound included quality_threshold: int = 0 # in phred33 + Intermediate: + seqs: dict of a fastq sequences key: sequence_name string, value: tupple: (sequence: str, quality: str) + Returns: dictionary consisting only of sequences that satisfy all conditions. @@ -76,29 +83,55 @@ def filter_fastq(seqs: dict, gc_bounds: Union[int, tuple] = (0, 100), length_bou exceptions if something went wrong. ''' - if not isinstance(seqs, dict): raise TypeError("seqs must be a dictionary") - if isinstance(gc_bounds, int): gc_bounds = (0, gc_bounds) - if isinstance(length_bounds, int): length_bounds = (0, length_bounds) - - rez_gc_bounds = {} - - for key in seqs.keys(): - if (gc_bounds[0] <= modules.fastq_tools.gc_count(seqs[key][0])) and (modules.fastq_tools.gc_count(seqs[key][0]) <= gc_bounds[1]): - rez_gc_bounds[key] = seqs[key] - - rez_length_bounds = {} - - for key in rez_gc_bounds.keys(): - if (length_bounds[0] <= len(seqs[key][0])) and (len(seqs[key][0]) <= length_bounds[1]): - rez_length_bounds[key] = rez_gc_bounds[key] - - rez_quality_threshold = {} - - for key in rez_length_bounds.keys(): - if (modules.fastq_tools.average_quality(seqs[key]) >= quality_threshold): - rez_quality_threshold[key] = rez_length_bounds[key] - - return rez_quality_threshold + output_dir = Path("filtered") + output_dir.mkdir(exist_ok=True) + + path_to_write = Path("filtered", f"{output_fastq}") + if path_to_write.is_file(): + is_overwtire = input(f"The file {path_to_write} already exists, want to overwrite it? Y/N") + if is_overwtire in {'Y', 'y'}: path_to_write.unlink() + else: exit() + + status = True + + with open(input_fastq, "r") as file: + while status: + seqs = modules.fastq_tools.read_seq_from_file(file) + if not status: + print(f"processing of the {input_fastq} is complete, filtering results are saved in {output_fastq}") + break + + if len(seqs.keys()) == 0: + print(f"processing of the {input_fastq} is complete, filtering results are saved in {output_fastq}") + break + + + if not isinstance(seqs, dict): raise TypeError("seqs must be a dictionary") + if isinstance(gc_bounds, int): gc_bounds = (0, gc_bounds) + if isinstance(length_bounds, int): length_bounds = (0, length_bounds) + + rez_gc_bounds = {} + + for key in seqs.keys(): + if (gc_bounds[0] <= modules.fastq_tools.gc_count(seqs[key][0])) and (modules.fastq_tools.gc_count(seqs[key][0]) <= gc_bounds[1]): + rez_gc_bounds[key] = seqs[key] + + rez_length_bounds = {} + + for key in rez_gc_bounds.keys(): + if (length_bounds[0] <= len(seqs[key][0])) and (len(seqs[key][0]) <= length_bounds[1]): + rez_length_bounds[key] = rez_gc_bounds[key] + + rez_quality_threshold = {} + + for key in rez_length_bounds.keys(): + if (modules.fastq_tools.average_quality(seqs[key]) >= quality_threshold): + rez_quality_threshold[key] = rez_length_bounds[key] + + if len(rez_quality_threshold.keys())>0: modules.fastq_tools.write_seq_to_fle(path_to_write, rez_quality_threshold) + else: continue + + return None if __name__ == "__main__": diff --git a/modules/dna_rna_tools.py b/modules/dna_rna_tools.py index e107624..78ce6f7 100644 --- a/modules/dna_rna_tools.py +++ b/modules/dna_rna_tools.py @@ -25,6 +25,9 @@ ValueError: if wrong sequence ''' +compliments_dna = {"A": "T", "a": "t", "T": "A", "t": "a", "G": "C", "g": "c", "C": "G", "c": "g"} +compliments_rna = {"A": "U", "a": "u", "U": "A", "u": "a", "G": "C", "g": "c", "C": "G", "c": "g"} +trinscribets = {"A": "A", "a": "a", "T": "U", "t": "u", "G": "G", "g": "g", "C": "C", "c": "c"} def is_nucleic_acid(seq: str) -> bool: ''' @@ -56,34 +59,8 @@ def transcribe(seq: str) -> str: ''' Return transcribed RNA from DNA sequence. ''' - if not is_dna(seq): - raise ValueError(f"{seq} is not DNA, can transcribe only DNA.") - rez = [] - for n in seq: - match n: - case 'a': - rez.append('a') - case 'A': - rez.append('A') - case 't': - rez.append('u') - case 'T': - rez.append('U') - case 'c': - rez.append('c') - case 'C': - rez.append('C') - case 'g': - rez.append('g') - case 'G': - rez.append('G') - case 'u': - rez.append('t') - case 'U': - rez.append('T') - case _: - return "not dna or rna sequence" - return ''.join(rez) + if not is_dna(seq): raise ValueError(f"{seq} is not DNA, can transcribe only DNA.") + else: return "".join([trinscribets[n] for n in seq]) def reverse(seq: str) -> str: @@ -99,49 +76,10 @@ def complement(seq: str) -> str: ''' if not is_nucleic_acid(seq): raise ValueError(f"{seq} is not DNA or RNA sqeuence!") - rez = [] - - if is_dna(seq): - for n in seq: - match n: - case 'a': - rez.append('t') - case 'A': - rez.append('T') - case 't': - rez.append('a') - case 'T': - rez.append('A') - case 'c': - rez.append('g') - case 'C': - rez.append('G') - case 'g': - rez.append('c') - case 'G': - rez.append('C') - - if is_rna(seq): - for n in seq: - match n: - case 'a': - rez.append('u') - case 'A': - rez.append('U') - case 'u': - rez.append('a') - case 'U': - rez.append('A') - case 'c': - rez.append('g') - case 'C': - rez.append('G') - case 'g': - rez.append('c') - case 'G': - rez.append('C') - - return ''.join(rez) + + if is_dna(seq): return "".join([compliments_dna[n] for n in seq]) + + if is_rna(seq): return "".join([compliments_rna[n] for n in seq]) def reverse_complement(seq: str) -> str: diff --git a/modules/fastq_tools.py b/modules/fastq_tools.py index 7018cca..786a869 100644 --- a/modules/fastq_tools.py +++ b/modules/fastq_tools.py @@ -13,7 +13,8 @@ ''' from . import dna_rna_tools - +from sys import exit +from typing import Union def nucl_count(posl: str, nucl: str) -> int: ''' @@ -78,6 +79,122 @@ def average_quality(read: tuple) -> int: return round(seq_score/len(read[0])) +def read_seq_from_file(file_name: str) -> dict: + ''' + Get open file object, read from in a sequence result and make a dictionary of it. + Update the status of file context if needed. + + Arguments: + file_name: opened file object for reading + + Global variable: + status + + Returns: + seq: dict of a form {id: (seq_read, seq_quality, seq_plus)} + + Raises: + exceptions if something went wrong + ''' + global status + seq_id = file_name.readline() + if not seq_id: + status = False + return {} + + else: + seq_id = seq_id.strip() + seq_seq = file_name.readline().strip() + seq_plus = file_name.readline().strip() + seq_qual = file_name.readline().strip() + return {seq_id: (seq_seq, seq_qual, seq_plus)} + + +def write_seq_to_fle(file_name: str, seq: dict, ) -> None: + ''' + Writes given seq (dict) of special shape/form to a given file + + Arguments: + seq: dict of a form {id: (seq_read, seq_quality)} + file_name: str of file name to write the dict content + + Returns: + None + in the file with file_name four rows to be writed: + id + seq_read + + + seq_quality + + + Raises: + exceptions if something went wrong + ''' + with file_name.open("a", encoding="utf-8") as file_w: + for key in seq.keys(): + file_w.write(key+'\n') + file_w.write(seq[key][0]+'\n') + file_w.write(seq[key][2]+'\n') + file_w.write(seq[key][1]+'\n') + + return None + +def write_genes_seq_to_fasta(genes_data: dict, output_file: str): + """ + Writes genes to a FASTA file. + + Arguments: + genes_data: dictionary with gene data + output_file: name of the output FASTA file + + Returns: + None + + """ + + with open(output_file, 'w', encoding='utf-8') as file: + for gene_num, gene_info in genes_data.items(): + header = f">gene_{gene_num}" + if gene_info['gene']: + header += f"|name_{gene_info['gene']}" + + file.write(header + "\n") + file.write(gene_info['translation'] + "\n\n") + + return None + +def find_genes_with_neighbors(genes_all: dict, genes: Union[int, tuple, list], n_before: int = 1, n_after: int = 1) -> dict: + """ + Finds genes of interest and their neighbors in a dictionary. + + Arguments: + genes_dict: dictionary with genes {'gene_name': {'gene': gene_name, 'gene_count': number, 'translation': seq}} + genes: gene numbers to search (int, tuple, or list) + n_before: how many genes before the target gene to include + n_after: how many genes after the target gene to include + + Returns: + dict: dictionary with found genes and their neighbors + + """ + + if isinstance(genes, str): + genes = [genes] + + rez = {} + genes_to_check = list(genes_all.keys()) + + for gene in genes: + idx = genes_to_check.index(gene) + + left_end = max(idx - n_before, 0) + right_end = min(idx + n_after + 1, len(genes_to_check)) + + for i in range(left_end, right_end): + key = genes_to_check[i] + rez[key] = genes_all[key] + + return rez if __name__ == "__main__": pass