Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@
*/.ipynb_checkpoints/

# jupyter notebook files
*.ipynba
*.ipynb

# __pycache__
__pycache__
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ fastq_dict = {
filter_fastq(fastq_dict, 100, 9, 37)
```

### bio_files_processor

#### convert_multiline_fasta_to_oneline:
reads a fasta file supplied as input,
in which the sequence (DNA/RNA/protein/etc.) can be split into several lines,
and then saves it into a new fasta file in which each sequence fits one line.

#### select_genes_from_gbk_to_fasta:
function receives a GBK-file as input,
extracts the specified number of genes before and after each gene of interest (gene),
and saves their protein sequence (translation) to a fasta file.

### Error Handling
Functions raise appropriate exceptions for invalid inputs.

Expand Down
185 changes: 185 additions & 0 deletions bio_files_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
'''
Functions for reading bioinformatics files.

convert_multiline_fasta_to_oneline: reads a fasta file supplied as input,
in which the sequence (DNA/RNA/protein/etc.) can be split into several lines,
and then saves it into a new fasta file in which each sequence fits one line.

select_genes_from_gbk_to_fasta: function receives a GBK-file as input,
extracts the specified number of genes before and after each gene of interest (gene),
and saves their protein sequence (translation) to a fasta file.

'''

from pathlib import Path
from typing import Union
import json
import modules.fastq_tools

def convert_multiline_fasta_to_oneline(input_fasta: str) -> None:
'''
A function processes fasta file: reads a fasta file supplied as input_fasta (str),
where one sequence (DNA/RNA/protein/etc.) can be split into several lines,
and then saves it into a new fasta file (output_fasta) where each sequence fits one line.

Input:
input_fasta: file with the input sequences

Arguments:
input_fasta: str

Returns:
output_fasta: a file where each sequence fits one line

Raises:
exceptions if something went wrong.
'''

path_to_write = Path(f"output_{input_fasta}")

with open(input_fasta, "r") as file:
rez = {}
line = file.readline()
while line:
if line and line[0] == '>':
key_line = line.strip()
rez[key_line] = []
n_line = file.readline().strip()
while n_line and n_line[0] != '>':
if n_line:
rez[key_line].append(n_line)
n_line = file.readline().strip()
line = n_line
else:
line = file.readline().strip()

with open(path_to_write, "w") as file_w:
for k in rez.keys():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему бы сразу не итеррироваться по значениям?

for k, seq in rez.items()

file_w.write(k+'\n')
file_w.write(''.join(rez[k])+'\n')

def parse_blast_output(input_gbk: str, genes: Union[int, tuple, list], output_fasta: str, n_before: int = 1, n_after: int = 1):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Получился какой-то гибрид - две функции в одной. Должны быть две различные функции.

'''
Receives a GBK-file as input, extracts the specified number of genes before and after each gene of interest (gene),
and saves their protein sequence (translation) to a fasta file.

Arguments:
input_gbk: path to the input GBK file
genes: genes of interest, near which neighbors are searched (string or collection of strings).
n_before, n_after: number of genes before and after (>=1)
output_fasta: output file name.

Returns:
output_fasta: a file where each sequence fits one line

Raises:
exceptions if something went wrong.
'''

genes_parsed = {}
gene_count = 0
current_gene = None
current_translation = ""
in_translation = False
in_cds = False
path_to_write = Path(f"output_{input_gbk.split('.')[0]}")

with open(input_gbk, 'r', encoding='utf-8') as file:
for line in file:
line = line.rstrip()

if line.startswith(' CDS'):
if in_cds and current_gene is not None and current_translation:
gene_count += 1
genes_parsed[current_gene] = {
'gene_count': gene_count,
'gene': current_gene,
'translation': current_translation
}

in_cds = True
current_gene = None
current_translation = ""
in_translation = False
continue

if in_cds:
if line and not line.startswith(' '):
if current_gene is not None and current_translation:
gene_count += 1
genes_parsed[current_gene] = {
'gene_count': gene_count,
'gene': current_gene,
'translation': current_translation
}

in_cds = line.startswith(' CDS')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А если будет меньше пробелов?

current_gene = None
current_translation = ""
in_translation = False
if in_cds:
continue

if '/gene=' in line and current_gene is None:
gene_part = line.split('/gene=')[1].strip()
if gene_part.startswith('"'):
current_gene = gene_part.split('"')[1]
else:
current_gene = gene_part.split()[0].strip('"')

elif '/translation=' in line:
in_translation = True
translation_part = line.split('/translation=')[1].strip()
if translation_part.startswith('"'):
translation_part = translation_part[1:]
if translation_part.endswith('"'):
translation_part = translation_part[:-1]
in_translation = False

current_translation = translation_part

elif in_translation:
clean_line = line.strip()

if '"' in clean_line:
clean_line = clean_line.split('"')[0]
in_translation = False

if clean_line.startswith('/'):
in_translation = False
else:
current_translation += clean_line

if in_cds and current_gene is not None and current_translation:
gene_count += 1
genes_parsed[current_gene] = {
'gene_count': gene_count,
'gene': current_gene,
'translation': current_translation
}

#print(genes_parsed)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Пустой список получился.


# For future use:
# to avoid new-parsing the if the data is already collected in a convenient form (dictionary)
with open(f'{input_gbk.split(".")[0]}.json', 'w', encoding='utf-8') as f:
json.dump(genes_parsed, f, ensure_ascii=False, indent=4)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Можно сохранит просто в формате .txt


print(f'saved human readable version of {input_gbk} in {input_gbk.split(".")[0]}.json')

genes_of_interests = modules.fastq_tools.find_genes_with_neighbors(genes_parsed, genes, n_before, n_after)

modules.fastq_tools.write_genes_seq_to_fasta(genes_of_interests, path_to_write)

return None











83 changes: 58 additions & 25 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Union
from os import path
from sys import exit
from pathlib import Path
import modules.dna_rna_tools

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Для четкости лучше импортировать конкретные функции, которые в модуле используются.

import modules.fastq_tools

Expand Down Expand Up @@ -55,50 +58,80 @@ def run_dna_rna_tools(*seq_data):
return rez


def filter_fastq(seqs: dict, gc_bounds: Union[int, tuple] = (0, 100), length_bounds: Union[int, tuple] = (0, 2**32), quality_threshold: int = 0) -> dict:
def filter_fastq(input_fastq: str, output_fastq: str, gc_bounds: Union[int, tuple] = (0, 100), length_bounds: Union[int, tuple] = (0, 2**32), quality_threshold: int = 0) -> None:
'''
A function working with fastq sequences.
All bounds is included.
Quality in Phred33.

Input:
seqs: dict of a fastq sequences key: sequence_name string, value: tupple: (sequence: str, quality: str)
input_fastq: file with the input sequences
output_fastq: file to store filtered sequences a special directory "filtered" will be created for it

Arguments:
gc_bounds: tuple = (0, 100) # bound included
length_bounds: tuple = (0, 2**32) # bound included
quality_threshold: int = 0 # in phred33

Intermediate:
seqs: dict of a fastq sequences key: sequence_name string, value: tupple: (sequence: str, quality: str)

Returns:
dictionary consisting only of sequences that satisfy all conditions.

Raises:
exceptions if something went wrong.
'''

if not isinstance(seqs, dict): raise TypeError("seqs must be a dictionary")
if isinstance(gc_bounds, int): gc_bounds = (0, gc_bounds)
if isinstance(length_bounds, int): length_bounds = (0, length_bounds)

rez_gc_bounds = {}

for key in seqs.keys():
if (gc_bounds[0] <= modules.fastq_tools.gc_count(seqs[key][0])) and (modules.fastq_tools.gc_count(seqs[key][0]) <= gc_bounds[1]):
rez_gc_bounds[key] = seqs[key]

rez_length_bounds = {}

for key in rez_gc_bounds.keys():
if (length_bounds[0] <= len(seqs[key][0])) and (len(seqs[key][0]) <= length_bounds[1]):
rez_length_bounds[key] = rez_gc_bounds[key]

rez_quality_threshold = {}

for key in rez_length_bounds.keys():
if (modules.fastq_tools.average_quality(seqs[key]) >= quality_threshold):
rez_quality_threshold[key] = rez_length_bounds[key]

return rez_quality_threshold
output_dir = Path("filtered")
output_dir.mkdir(exist_ok=True)

path_to_write = Path("filtered", f"{output_fastq}")
if path_to_write.is_file():
is_overwtire = input(f"The file {path_to_write} already exists, want to overwrite it? Y/N")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Лучше такие побочные эффекты не прописывать в модулях - представим, что мы хотим использовать какую-то функцию, а программа вдруг зависает и ждет от нас какие-то значения для ввода. Лучше вызывать ошибку или предупреждение в таком случае (например, использование warnings.warn). Cкрытые побочные эффекты являются частой причиной сложно отлаживаемых ошибок в коде

if is_overwtire in {'Y', 'y'}: path_to_write.unlink()
else: exit()

status = True

with open(input_fastq, "r") as file:
while status:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Всегда осторожно с такими циклами - легко улететь в бесконечный.

seqs = modules.fastq_tools.read_seq_from_file(file)
if not status:
print(f"processing of the {input_fastq} is complete, filtering results are saved in {output_fastq}")
break

if len(seqs.keys()) == 0:
print(f"processing of the {input_fastq} is complete, filtering results are saved in {output_fastq}")
break


if not isinstance(seqs, dict): raise TypeError("seqs must be a dictionary")
if isinstance(gc_bounds, int): gc_bounds = (0, gc_bounds)
if isinstance(length_bounds, int): length_bounds = (0, length_bounds)

rez_gc_bounds = {}

for key in seqs.keys():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В питоне есть такое понятие: Single Responsibility Principle. Суть: у каждого класса, модуля или функции должна быть только одна причина для изменения. То есть, каждый компонент системы должен отвечать за одну конкретную задачу или ответственность - иначе, усложняется код и затрудняется его поддержка и тестирование.

if (gc_bounds[0] <= modules.fastq_tools.gc_count(seqs[key][0])) and (modules.fastq_tools.gc_count(seqs[key][0]) <= gc_bounds[1]):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему бы не импортировать сразу модуль fastq_tools?
Код очень сложно читаем.

rez_gc_bounds[key] = seqs[key]

rez_length_bounds = {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Супер неоптимально - мы записываем в три разных переменных результаты проверки одной последовательности. Лучше сразу просто проверить последовательность на все фильтры.


for key in rez_gc_bounds.keys():
if (length_bounds[0] <= len(seqs[key][0])) and (len(seqs[key][0]) <= length_bounds[1]):
rez_length_bounds[key] = rez_gc_bounds[key]

rez_quality_threshold = {}

for key in rez_length_bounds.keys():
if (modules.fastq_tools.average_quality(seqs[key]) >= quality_threshold):
rez_quality_threshold[key] = rez_length_bounds[key]

if len(rez_quality_threshold.keys())>0: modules.fastq_tools.write_seq_to_fle(path_to_write, rez_quality_threshold)
else: continue

return None


if __name__ == "__main__":
Expand Down
Loading