diff --git a/genom.py b/fastq_filter.py similarity index 63% rename from genom.py rename to fastq_filter.py index 2a6d89f..17f25d5 100644 --- a/genom.py +++ b/fastq_filter.py @@ -1,6 +1,12 @@ import Bio.SeqIO import Bio.SeqUtils import os +import click +import logging +import sys + + +logger = logging.getLogger() def _in_bounds(value, bounds): @@ -12,7 +18,7 @@ def _in_bounds(value, bounds): def filter_fastq(input_fastq: str, output_fastq: str, gc_bounds: int | tuple = (0, 100), length_bounds: int | tuple = (0, 2**32), - quality_threshold: int = 0) -> dict: + quality_threshold: int = 0): """ Function filter_fastq(input_fastq, output_fastq, gc_bounds,length_bounds, quality_threshold) returns new file with fastq sequnces according to @@ -37,17 +43,17 @@ def filter_fastq(input_fastq: str, output_fastq: str, default 0(scale Phred33). Reads with quality lower then treshold are decline. """ + if not os.path.isfile(input_fastq): + logger.error("ERROR: Can not find file %s", input_fastq) + raise Exception(f'Can not find file {input_fastq}') + parent_dir = os.path.dirname(input_fastq) new_file_path = os.path.join(parent_dir, 'filtered', output_fastq) - '''Checks the name of new file is no the same as name of other files - in directory. If it is the same, returns 'Please insert other name - for output file, file is exsist' - ''' - if os.path.isfile(new_file_path): - return 'Please insert other name for output file, file is exsist' + + c = 0 with open(new_file_path, "w") as output: - for rec in Bio.SeqIO.parse(input, "fastq"): + for rec in Bio.SeqIO.parse(input_fastq, "fastq"): seq = rec.seq quality = rec.letter_annotations["phred_quality"] @@ -56,8 +62,9 @@ def filter_fastq(input_fastq: str, output_fastq: str, cond3 = sum(quality) / len(quality) >= quality_threshold if cond1 and cond2 and cond3: Bio.SeqIO.write(rec, output, "fastq") + c += 1 - return 'Filtering is complete' + logger.info("INFO from %s to %s filteren %s records", input_fastq, output_fastq, c) class BiologicalSequence: @@ -130,3 +137,70 @@ def check_alphabet(self): if c not in valid: return False return True + + + +@click.group() +def main(): + pass + + +@click.command() +@click.option("-q", "--quality", default=0, type=click.IntRange(min=0, max_open=True)) +@click.option("-gc-low", default=0, type=click.IntRange(min=0, max=100)) +@click.option("-gc-high", default=100, type=click.IntRange(min=0, max=100)) +@click.option("-len-low", default=0, type=click.IntRange(min=0, max=2**32)) +@click.option("-len-high", default=2**32, type=click.IntRange(min=0, max=2**32)) +@click.argument("infile") +@click.argument("outfile") +def filter(quality, gc_low, gc_high, len_low, len_high, infile, outfile): + try: + filter_fastq(infile, outfile, + gc_bounds=(gc_low, gc_high), + length_bounds=(len_low, len_high), + quality_threshold=quality) + except Exception as e: + print(e, file=sys.stderr) + + +@click.command() +@click.argument("cmd", type=click.Choice(["compl", "reverse", "complreverse", "transcribe"])) +@click.argument("seq") +def dna(cmd, seq): + x = DNASequence(seq) + if not x.check_alphabet(): + print("invalid DNA sequence", file=sys.stderr) + return 1 + if cmd == "compl": + print(x.complement()) + elif cmd == "reverse": + print(x.reverse()) + elif cmd == "complreverse": + print(x.reverse_complement()) + elif cmd == "transcribe": + print(x.transcribe()) + + +@click.command() +@click.argument("cmd", type=click.Choice(["compl", "reverse", "complreverse"])) +@click.argument("seq") +def rna(cmd, seq): + x = RNASequence(seq) + if not x.check_alphabet(): + print("invalid RNA sequence", file=sys.stderr) + return 1 + if cmd == "compl": + print(x.complement()) + elif cmd == "reverse": + print(x.reverse()) + elif cmd == "complreverse": + print(x.reverse_complement()) + + +main.add_command(filter) +main.add_command(dna) +main.add_command(rna) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/fastq_filter_test.py b/fastq_filter_test.py new file mode 100644 index 0000000..3a22385 --- /dev/null +++ b/fastq_filter_test.py @@ -0,0 +1,93 @@ +import unittest +import fastq_filter as genom +import tempfile +import os + + +class DnaTests(unittest.TestCase): + def test_check_alphabet(self): + dna = genom.DNASequence("ATGC") + self.assertTrue(dna.check_alphabet()) + dna = genom.DNASequence("ATGCU") + self.assertFalse(dna.check_alphabet()) + + def test_complement(self): + dna = genom.DNASequence("ATGC") + compl = dna.complement() + self.assertEqual(compl.seq, "TACG") + + def test_reverse(self): + dna = genom.DNASequence("ATGC") + rev = dna.reverse() + self.assertEqual(rev.seq, "CGTA") + + +class RnaTests(unittest.TestCase): + def test_check_alphabet(self): + rna = genom.RNASequence("AUGC") + self.assertTrue(rna.check_alphabet()) + rna = genom.RNASequence("ATGCU") + self.assertFalse(rna.check_alphabet()) + + def test_complement(self): + rna = genom.RNASequence("AUGC") + compl = rna.complement() + self.assertEqual(compl.seq, "UACG") + + def test_reverse(self): + rna = genom.RNASequence("AUGC") + rev = rna.reverse() + self.assertEqual(rev.seq, "CGUA") + + +class FilterTests(unittest.TestCase): + seq = """@SRR1363257.37 GWZHISEQ01:153:C1W31ACXX:5:1101:14027:2198 length=101 +GGTTGCAGATTCGCAGTGTCGCTGTTCCAGCGCATCACATCTTTGATGTTCACGCCGTGGCGTTTAGCAATGCTTGAAAGCGAATCGCCTTTGCCCACACG ++ +@?:=:;DBFADH;CAECEE@@E:FFHGAE4?C?DE?>FHE4BFFIIFHIBABEECA83;>>@>@CCCDC9@@CC08<@?@BB@9:CC####### +""" + + def test_filter_no_file(self): + with self.assertRaises(Exception): + # where is no such file + genom.filter_fastq("---------------") + + def test_filter_id(self): + name_in = tempfile.mktemp(suffix=".fastq") + name_out = tempfile.mktemp(suffix=".fastq") + try: + with open(name_in, "w") as f: + f.write(self.seq) + genom.filter_fastq(name_in, name_out) + self.assertTrue(os.path.isfile(name_out)) + + with open(name_out, "r") as f: + res = f.read() + self.assertTrue(res.strip() == self.seq.strip()) + finally: + if os.path.isfile(name_in): + os.remove(name_in) + if os.path.isfile(name_out): + os.remove(name_out) + + def test_filter_all(self): + name_in = tempfile.mktemp(suffix=".fastq") + name_out = tempfile.mktemp(suffix=".fastq") + try: + with open(name_in, "w") as f: + f.write(self.seq) + genom.filter_fastq(name_in, name_out, length_bounds=(102, 1000)) + self.assertTrue(os.path.isfile(name_out)) + + with open(name_out, "r") as f: + res = f.read() + self.assertTrue(res.strip() == "") + finally: + if os.path.isfile(name_in): + os.remove(name_in) + if os.path.isfile(name_out): + os.remove(name_out) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..73ad1d9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +biopython==1.85 +click==8.1.8