Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
from Bio import SeqIO
from Bio.SeqUtils import gc_fraction
from parser import parser
import sys
import logging

logging.basicConfig(
filename="logs.log",
filemode="w",
format="{asctime} {levelname} +++ {message}",
datefmt="%Y-%m-%d %H:%M:%S",
style="{",
level=logging.INFO,
force=True,
encoding="utf-8",
)


class BiologicalSequence:
Expand Down Expand Up @@ -150,13 +164,70 @@ def filter_fastq(
) -> None:
if isinstance(gc_bounds, float | int):
gc_bounds = [gc_bounds, 100]
elif isinstance(gc_bounds, tuple):
gc_bounds = [gc_bounds[0], gc_bounds[1]]

if isinstance(length_bounds, float | int):
length_bounds = [length_bounds, 2 ** 32]
elif isinstance(length_bounds, tuple):
length_bounds = [length_bounds[0], length_bounds[1]]

with open(input_fastq, "r") as input_fastq, open(output_fastq, "w") as output_fastq:
for record in SeqIO.parse(input_fastq, "fastq"):
if min(record.letter_annotations["phred_quality"]) >= quality_threshold:
if gc_bounds[0] < gc_fraction(record.seq) * 100 < gc_bounds[1]:
if length_bounds[0] < len(record.seq) < length_bounds[1]:
if gc_bounds[0] <= gc_fraction(record.seq) * 100 <= gc_bounds[1]:
if length_bounds[0] <= len(record.seq) <= length_bounds[1]:
SeqIO.write(record, output_fastq, "fastq")


if __name__ == "__main__":
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)

args = parser.parse_args()
input_fastq = args.input_fastq
output_fastq = args.output_fastq
# parse gc_bounds
if args.gc_bounds is None:
gc_bounds = (0, 100)
else:
if len(args.gc_bounds) == 1:
gc_bounds = int(args.gc_bounds[0])
elif len(args.gc_bounds) == 2:
gc_bounds = tuple(filter(lambda x: float(x), args.gc_bounds))
if gc_bounds[0] > gc_bounds[1]:
gc_bounds = (gc_bounds[1], gc_bounds[0])
logging.error(
f"Mistake the order of gc-bounds. New order is {gc_bounds}"
)
else:
raise Exception

# parse length_bounds
if args.length_bounds is None:
length_bounds = (0, 2 ** 32)
else:
if len(args.length_bounds) == 1:
length_bounds = int(args.length_bounds[0])
elif len(args.length_bounds) == 2:
length_bounds = tuple(filter(lambda x: int(x), args.length_bounds))
else:
raise Exception
# parse quality_threshold
if args.quality_threshold is None:
quality_threshold = 0
else:
quality_threshold = int(args.quality_threshold[0])

filter_fastq(input_fastq, output_fastq, gc_bounds, length_bounds, quality_threshold)

logging.info(
f"Fastq filtration with options:\n"
f" - input file: {input_fastq}\n"
f" - output file: {output_fastq}\n"
f" - gc_bounds: {gc_bounds}\n"
f" - length_bounds: {length_bounds}\n"
f" - quality_threshold: {quality_threshold}\n"
"Performed successfully."
)
35 changes: 35 additions & 0 deletions parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import argparse

parser = argparse.ArgumentParser(
prog="FASTQ FILTRATOR",
description="This tool filtrate nucleotide reads from fastq file by gc_content, length and quality treshold.",
epilog="Enjoy FASTQ FILTRATOR tool!",
)

parser.add_argument("input_fastq", type=str, help="Takes path to input fastq file")
parser.add_argument(
"output_fastq", type=str, help="Takes path to output filtrated fastq file"
)
parser.add_argument(
"-gc",
"--gc_bounds",
metavar="%%",
type=float,
nargs="+",
help="Takes one or two floats as the limitation of gc content (in %%). Default from 0 to 100 percentages.",
)
parser.add_argument(
"-lb",
"--length_bounds",
metavar="bp",
type=int,
nargs="+",
help="Takes one or two intagers as the limitation of length of reads. Default from 0 to 2 ** 32 bp.",
)
parser.add_argument(
"-qt",
"--quality_threshold",
type=int,
nargs=1,
help="Takes one intager as the limitation of read quality. Default is 0.",
)
1 change: 1 addition & 0 deletions requirments.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
biopython==1.85
numpy==2.2.3
pytest==8.3.3
Empty file added tests/__init__.py
Empty file.
138 changes: 138 additions & 0 deletions tests/test_fastq_filtrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import os.path

import pytest

from main import filter_fastq


def get_output_data(output):
result = []
with open(output) as file:
for line in file:
result.append(line.strip("\n"))
return result


@pytest.fixture
def tmp_input():
file_path = "tmp_input.fastq"
with open(file_path, "w") as fastq:
fastq.write("@one\nAAAAA\n+\n@@@@@\n")
fastq.write("@two\nATGC\n+\n@@@@\n")
fastq.write("@three\nGGG\n+\nEEE\n")

yield file_path

if os.path.exists(file_path):
os.remove(file_path)


@pytest.fixture
def tmp_incorrect_fastq_input():
file_path = "tmp_input_incorrect.fastq"
with open(file_path, "w") as fastq:
fastq.write("@incorr\n\n+\n\n") # нет последоват-ти
fastq.write("@corr\nGGG\n+\nFFF\n")

yield file_path

if os.path.exists(file_path):
os.remove(file_path)


@pytest.fixture
def tmp_output():
file_path = "tmp_output.fastq"

yield file_path

if os.path.exists(file_path):
os.remove(file_path)


def test_filter_fastq_input_file_not_exists(tmp_output):
with pytest.raises(FileNotFoundError):
filter_fastq(input_fastq="....", output_fastq=tmp_output)


def test_filter_fastq_file_incorrect(tmp_incorrect_fastq_input, tmp_output):
with pytest.raises(ValueError):
filter_fastq(input_fastq=tmp_incorrect_fastq_input, output_fastq=tmp_output)


def test_filter_fastq_output_exists(tmp_input, tmp_output):
filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output)
assert os.path.exists(tmp_output)


def test_filter_fastq_with_default(tmp_input, tmp_output):
filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output)

expected_result = [
"@one",
"AAAAA",
"+",
"@@@@@",
"@two",
"ATGC",
"+",
"@@@@",
"@three",
"GGG",
"+",
"EEE",
]

assert get_output_data(tmp_output) == expected_result


def test_filter_fastq_by_len(tmp_input, tmp_output):
filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output, length_bounds=5)

expected_result = [
"@one",
"AAAAA",
"+",
"@@@@@",
]

assert get_output_data(tmp_output) == expected_result


def test_filter_fastq_by_gc(tmp_input, tmp_output):
filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output, gc_bounds=(90, 100))

expected_result = [
"@three",
"GGG",
"+",
"EEE",
]

assert get_output_data(tmp_output) == expected_result


def test_filter_fastq_by_gc_only_lower_bound(tmp_input, tmp_output):
filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output, gc_bounds=90)

expected_result = [
"@three",
"GGG",
"+",
"EEE",
]

assert get_output_data(tmp_output) == expected_result


def test_filter_fastq_by_quality(tmp_input, tmp_output):
filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output, quality_threshold=32)

expected_result = [
"@three",
"GGG",
"+",
"EEE",
]

assert get_output_data(tmp_output) == expected_result