diff --git a/main.py b/main.py index e3c7ddd..25baa74 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,19 @@ from Bio import SeqIO from Bio.SeqUtils import gc_fraction +from parser import parser +import sys +import logging + +logging.basicConfig( + filename="logs.log", + filemode="w", + format="{asctime} {levelname} +++ {message}", + datefmt="%Y-%m-%d %H:%M:%S", + style="{", + level=logging.INFO, + force=True, + encoding="utf-8", +) class BiologicalSequence: @@ -150,13 +164,70 @@ def filter_fastq( ) -> None: if isinstance(gc_bounds, float | int): gc_bounds = [gc_bounds, 100] + elif isinstance(gc_bounds, tuple): + gc_bounds = [gc_bounds[0], gc_bounds[1]] if isinstance(length_bounds, float | int): length_bounds = [length_bounds, 2 ** 32] + elif isinstance(length_bounds, tuple): + length_bounds = [length_bounds[0], length_bounds[1]] with open(input_fastq, "r") as input_fastq, open(output_fastq, "w") as output_fastq: for record in SeqIO.parse(input_fastq, "fastq"): if min(record.letter_annotations["phred_quality"]) >= quality_threshold: - if gc_bounds[0] < gc_fraction(record.seq) * 100 < gc_bounds[1]: - if length_bounds[0] < len(record.seq) < length_bounds[1]: + if gc_bounds[0] <= gc_fraction(record.seq) * 100 <= gc_bounds[1]: + if length_bounds[0] <= len(record.seq) <= length_bounds[1]: SeqIO.write(record, output_fastq, "fastq") + + +if __name__ == "__main__": + if len(sys.argv) == 1: + parser.print_help(sys.stderr) + sys.exit(1) + + args = parser.parse_args() + input_fastq = args.input_fastq + output_fastq = args.output_fastq + # parse gc_bounds + if args.gc_bounds is None: + gc_bounds = (0, 100) + else: + if len(args.gc_bounds) == 1: + gc_bounds = int(args.gc_bounds[0]) + elif len(args.gc_bounds) == 2: + gc_bounds = tuple(filter(lambda x: float(x), args.gc_bounds)) + if gc_bounds[0] > gc_bounds[1]: + gc_bounds = (gc_bounds[1], gc_bounds[0]) + logging.error( + f"Mistake the order of gc-bounds. New order is {gc_bounds}" + ) + else: + raise Exception + + # parse length_bounds + if args.length_bounds is None: + length_bounds = (0, 2 ** 32) + else: + if len(args.length_bounds) == 1: + length_bounds = int(args.length_bounds[0]) + elif len(args.length_bounds) == 2: + length_bounds = tuple(filter(lambda x: int(x), args.length_bounds)) + else: + raise Exception + # parse quality_threshold + if args.quality_threshold is None: + quality_threshold = 0 + else: + quality_threshold = int(args.quality_threshold[0]) + + filter_fastq(input_fastq, output_fastq, gc_bounds, length_bounds, quality_threshold) + + logging.info( + f"Fastq filtration with options:\n" + f" - input file: {input_fastq}\n" + f" - output file: {output_fastq}\n" + f" - gc_bounds: {gc_bounds}\n" + f" - length_bounds: {length_bounds}\n" + f" - quality_threshold: {quality_threshold}\n" + "Performed successfully." + ) diff --git a/parser.py b/parser.py new file mode 100644 index 0000000..23f1142 --- /dev/null +++ b/parser.py @@ -0,0 +1,35 @@ +import argparse + +parser = argparse.ArgumentParser( + prog="FASTQ FILTRATOR", + description="This tool filtrate nucleotide reads from fastq file by gc_content, length and quality treshold.", + epilog="Enjoy FASTQ FILTRATOR tool!", +) + +parser.add_argument("input_fastq", type=str, help="Takes path to input fastq file") +parser.add_argument( + "output_fastq", type=str, help="Takes path to output filtrated fastq file" +) +parser.add_argument( + "-gc", + "--gc_bounds", + metavar="%%", + type=float, + nargs="+", + help="Takes one or two floats as the limitation of gc content (in %%). Default from 0 to 100 percentages.", +) +parser.add_argument( + "-lb", + "--length_bounds", + metavar="bp", + type=int, + nargs="+", + help="Takes one or two intagers as the limitation of length of reads. Default from 0 to 2 ** 32 bp.", +) +parser.add_argument( + "-qt", + "--quality_threshold", + type=int, + nargs=1, + help="Takes one intager as the limitation of read quality. Default is 0.", +) diff --git a/requirments.txt b/requirments.txt index 0428672..3ea7050 100644 --- a/requirments.txt +++ b/requirments.txt @@ -1,2 +1,3 @@ biopython==1.85 numpy==2.2.3 +pytest==8.3.3 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_fastq_filtrator.py b/tests/test_fastq_filtrator.py new file mode 100644 index 0000000..39c9ce9 --- /dev/null +++ b/tests/test_fastq_filtrator.py @@ -0,0 +1,138 @@ +import os.path + +import pytest + +from main import filter_fastq + + +def get_output_data(output): + result = [] + with open(output) as file: + for line in file: + result.append(line.strip("\n")) + return result + + +@pytest.fixture +def tmp_input(): + file_path = "tmp_input.fastq" + with open(file_path, "w") as fastq: + fastq.write("@one\nAAAAA\n+\n@@@@@\n") + fastq.write("@two\nATGC\n+\n@@@@\n") + fastq.write("@three\nGGG\n+\nEEE\n") + + yield file_path + + if os.path.exists(file_path): + os.remove(file_path) + + +@pytest.fixture +def tmp_incorrect_fastq_input(): + file_path = "tmp_input_incorrect.fastq" + with open(file_path, "w") as fastq: + fastq.write("@incorr\n\n+\n\n") # нет последоват-ти + fastq.write("@corr\nGGG\n+\nFFF\n") + + yield file_path + + if os.path.exists(file_path): + os.remove(file_path) + + +@pytest.fixture +def tmp_output(): + file_path = "tmp_output.fastq" + + yield file_path + + if os.path.exists(file_path): + os.remove(file_path) + + +def test_filter_fastq_input_file_not_exists(tmp_output): + with pytest.raises(FileNotFoundError): + filter_fastq(input_fastq="....", output_fastq=tmp_output) + + +def test_filter_fastq_file_incorrect(tmp_incorrect_fastq_input, tmp_output): + with pytest.raises(ValueError): + filter_fastq(input_fastq=tmp_incorrect_fastq_input, output_fastq=tmp_output) + + +def test_filter_fastq_output_exists(tmp_input, tmp_output): + filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output) + assert os.path.exists(tmp_output) + + +def test_filter_fastq_with_default(tmp_input, tmp_output): + filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output) + + expected_result = [ + "@one", + "AAAAA", + "+", + "@@@@@", + "@two", + "ATGC", + "+", + "@@@@", + "@three", + "GGG", + "+", + "EEE", + ] + + assert get_output_data(tmp_output) == expected_result + + +def test_filter_fastq_by_len(tmp_input, tmp_output): + filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output, length_bounds=5) + + expected_result = [ + "@one", + "AAAAA", + "+", + "@@@@@", + ] + + assert get_output_data(tmp_output) == expected_result + + +def test_filter_fastq_by_gc(tmp_input, tmp_output): + filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output, gc_bounds=(90, 100)) + + expected_result = [ + "@three", + "GGG", + "+", + "EEE", + ] + + assert get_output_data(tmp_output) == expected_result + + +def test_filter_fastq_by_gc_only_lower_bound(tmp_input, tmp_output): + filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output, gc_bounds=90) + + expected_result = [ + "@three", + "GGG", + "+", + "EEE", + ] + + assert get_output_data(tmp_output) == expected_result + + +def test_filter_fastq_by_quality(tmp_input, tmp_output): + filter_fastq(input_fastq=tmp_input, output_fastq=tmp_output, quality_threshold=32) + + expected_result = [ + "@three", + "GGG", + "+", + "EEE", + ] + + assert get_output_data(tmp_output) == expected_result