From 5a60dc756ca334beb551bc953882b52df7574cb7 Mon Sep 17 00:00:00 2001 From: Noam Teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Sat, 16 Aug 2025 12:08:35 -0700 Subject: [PATCH 1/4] dep: added binseq dependency --- Cargo.lock | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + 2 files changed, 68 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 9c96ef8..6a9cd2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -89,6 +89,17 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "auto_impl" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdcb70bdbc4d478427380519163274ac86e52916e10f0a8889adf0f96d3fee7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -127,12 +138,36 @@ dependencies = [ "virtue", ] +[[package]] +name = "binseq" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d83da7b79813e69bdb80b978548adb6081dfac2a7e7e595dad26d70338359cc" +dependencies = [ + "anyhow", + "auto_impl", + "bitnuc", + "bytemuck", + "byteorder", + "memmap2", + "num_cpus", + "rand", + "thiserror 2.0.14", + "zstd", +] + [[package]] name = "bitflags" version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "bitnuc" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a47910c6b1c40def8f66e535056915fc8eab20d053b19e8c2b236558ec00a7" + [[package]] name = "bstr" version = "1.12.0" @@ -171,6 +206,12 @@ version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3995eaeebcdf32f91f980d360f78732ddc061097ab4e39991ae7a6ace9194677" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bzip2" version = "0.4.4" @@ -326,6 +367,7 @@ dependencies = [ "anyhow", "assert_cmd", "bincode", + "binseq", "clap", "flate2", "indicatif", @@ -487,6 +529,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "indexmap" version = "2.10.0" @@ -635,6 +683,15 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "memmap2" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "483758ad303d734cec05e5c12b41d7e93e6a6390c5e9dae6bdeb7c1259012d28" +dependencies = [ + "libc", +] + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -689,6 +746,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index b97b2d8..df2b6c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ parking_lot = "0.12" zstd = "0.13" indicatif = "0.17" liblzma = "0.3.1" +binseq = "0.6.5" [lints.clippy] too_many_arguments = "allow" From 137e3d9c5512178fbbfc0b119bf5433c8bc8bbda Mon Sep 17 00:00:00 2001 From: Noam Teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Sat, 16 Aug 2025 12:10:09 -0700 Subject: [PATCH 2/4] feat(filter): added binseq support to filter struct --- src/filter.rs | 150 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 147 insertions(+), 3 deletions(-) diff --git a/src/filter.rs b/src/filter.rs index f89047c..56e6326 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,5 +1,6 @@ use crate::{FilterConfig, index::load_minimizer_hashes}; use anyhow::{Context, Result}; +use binseq::ParallelReader as BinseqParallelReader; use flate2::write::GzEncoder; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; use liblzma::write::XzEncoder; @@ -90,6 +91,44 @@ fn format_record_to_buffer( Ok(()) } +fn format_binseq_record_to_buffer( + record: &Rf, + decoded_seq: &[u8], + extended: bool, + buffer: &mut Vec, +) -> Result<()> { + let is_fasta = record.has_quality(); + + // write marker + if is_fasta { + buffer.write_all(b">")?; + } else { + buffer.write_all(b"@")?; + } + + // write header + buffer.extend_from_slice(record.index().to_string().as_bytes()); + buffer.write_all(b"\n")?; + + // Sequence line + buffer.extend_from_slice(decoded_seq); + + if is_fasta { + buffer.write_all(b"\n")?; + } else { + // FASTQ: plus line and quality + buffer.write_all(b"\n+\n")?; + let qual = if extended { + record.xqual() + } else { + record.squal() + }; + buffer.extend_from_slice(qual); + buffer.write_all(b"\n")?; + } + Ok(()) +} + /// Validate compression level for the given format fn validate_compression_level(level: u8, min: u8, max: u8, format: &str) -> Result<()> { if level < min || level > max { @@ -198,6 +237,8 @@ struct FilterProcessor { local_buffer2: Vec, // Second buffer for paired output local_stats: ProcessingStats, filter_buffers: FilterBuffers, + sbuf: Vec, // decoding buffer for primary sequence + xbuf: Vec, // decoding buffer for extended sequence // Global state global_writer: Arc>, @@ -268,6 +309,8 @@ impl FilterProcessor { debug: config.debug, local_buffer: Vec::with_capacity(DEFAULT_BUFFER_SIZE), local_buffer2: Vec::with_capacity(DEFAULT_BUFFER_SIZE), + sbuf: Vec::default(), + xbuf: Vec::default(), local_stats: ProcessingStats::default(), filter_buffers: FilterBuffers::default(), global_writer: Arc::new(Mutex::new(writer)), @@ -794,6 +837,93 @@ impl PairedParallelProcessor for FilterProcessor { } } +impl binseq::ParallelProcessor for FilterProcessor { + fn process_record(&mut self, record: Rf) -> binseq::Result<()> { + // clear decoding buffers + self.sbuf.clear(); + self.xbuf.clear(); + + let (should_keep, hit_count, total_minimizers, hit_kmers) = if record.is_paired() { + self.local_stats.total_seqs += 1; + self.local_stats.total_bp += record.slen() + record.xlen(); + record.decode_s(&mut self.sbuf)?; + record.decode_x(&mut self.xbuf)?; + self.should_keep_pair(self.sbuf.as_ref(), self.xbuf.as_ref()) + } else { + self.local_stats.total_seqs += 1; + self.local_stats.total_bp += record.slen(); + record.decode_s(&mut self.sbuf)?; + Self::should_keep_sequence( + self.sbuf.as_ref(), + &mut self.filter_buffers, + self.kmer_length, + self.window_size, + self.deplete, + self.prefix_length, + self.minimizer_hashes.clone(), + self.debug, + self.abs_threshold, + self.rel_threshold, + ) + }; + + // Show debug info for sequences with hits + if self.debug { + eprintln!( + "DEBUG: record {} hits={}/{} keep={} kmers=[{}]", + record.index(), + hit_count, + total_minimizers, + should_keep, + hit_kmers.join(",") + ); + } + + let seq = &self.sbuf; + if should_keep { + self.local_stats.output_bp += seq.len() as u64; + self.local_stats.output_seq_counter += 1; + format_binseq_record_to_buffer(&record, &seq, false, &mut self.local_buffer)?; + } else { + self.local_stats.filtered_seqs += 1; + self.local_stats.filtered_bp += seq.len() as u64; + } + + Ok(()) + } + + fn on_batch_complete(&mut self) -> binseq::Result<()> { + // Write buffer to output + if !self.local_buffer.is_empty() { + let mut global_writer = self.global_writer.lock(); + global_writer.write_all(&self.local_buffer)?; + global_writer.flush()?; + } + + // Clear buffer after releasing the lock for better performance + self.local_buffer.clear(); + + // Update global stats + { + let mut stats = self.global_stats.lock(); + stats.total_seqs += self.local_stats.total_seqs; + stats.filtered_seqs += self.local_stats.filtered_seqs; + stats.total_bp += self.local_stats.total_bp; + stats.output_bp += self.local_stats.output_bp; + stats.filtered_bp += self.local_stats.filtered_bp; + stats.output_seq_counter += self.local_stats.output_seq_counter; + } + + // Update spinner + self.update_spinner(); + + // Reset local stats + self.local_stats = ProcessingStats::default(); + + Ok(()) + } +} + pub fn run(config: &FilterConfig) -> Result<()> { let start_time = Instant::now(); let version: String = env!("CARGO_PKG_VERSION").to_string(); @@ -822,7 +952,16 @@ pub fn run(config: &FilterConfig) -> Result<()> { } else if config.input2_path.is_some() { input_type.push_str("paired"); } else { - input_type.push_str("single"); + if config.input_path.ends_with("bq") | config.input_path.ends_with(".vbq") { + let reader = binseq::BinseqReader::new(&config.input_path)?; + if reader.is_paired() { + input_type.push_str("paired"); + } else { + input_type.push_str("single"); + } + } else { + input_type.push_str("single"); + } } options.push(format!( "abs_threshold={}, rel_threshold={}", @@ -926,8 +1065,13 @@ pub fn run(config: &FilterConfig) -> Result<()> { r1_reader.process_parallel_paired(r2_reader, processor.clone(), num_threads)?; } else { // Single file or stdin - let reader = create_paraseq_reader(Some(config.input_path))?; - reader.process_parallel(processor.clone(), num_threads)?; + if config.input_path.ends_with(".bq") | config.input_path.ends_with(".vbq") { + let reader = binseq::BinseqReader::new(&config.input_path)?; + reader.process_parallel(processor.clone(), num_threads)?; + } else { + let reader = create_paraseq_reader(Some(config.input_path))?; + reader.process_parallel(processor.clone(), num_threads)?; + } } // Get final stats From 275423ffc7a8b8526e9804dd4eda2d1f09bc6bf9 Mon Sep 17 00:00:00 2001 From: Noam Teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Sat, 16 Aug 2025 12:10:56 -0700 Subject: [PATCH 3/4] refactor(filter): make should_keep_sequence an associated function to avoid mutable and immutable usage --- src/filter.rs | 94 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 70 insertions(+), 24 deletions(-) diff --git a/src/filter.rs b/src/filter.rs index 56e6326..79d080c 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -268,20 +268,31 @@ struct FilterBuffers { impl FilterProcessor { /// Calculate required hits based on absolute and relative thresholds - fn calculate_required_hits(&self, total_minimizers: usize) -> usize { - let abs_required = self.abs_threshold; + fn calculate_required_hits( + total_minimizers: usize, + abs_threshold: usize, + rel_threshold: f64, + ) -> usize { + let abs_required = abs_threshold; let rel_required = if total_minimizers == 0 { 0 } else { - ((self.rel_threshold * total_minimizers as f64).round() as usize).max(1) + ((rel_threshold * total_minimizers as f64).round() as usize).max(1) }; abs_required.max(rel_required) } /// Check if sequence meets filtering criteria - fn meets_filtering_criteria(&self, hit_count: usize, total_minimizers: usize) -> bool { - let required = self.calculate_required_hits(total_minimizers); - if self.deplete { + fn meets_filtering_criteria( + hit_count: usize, + total_minimizers: usize, + deplete: bool, + abs_threshold: usize, + rel_threshold: f64, + ) -> bool { + let required = + Self::calculate_required_hits(total_minimizers, abs_threshold, rel_threshold); + if deplete { hit_count < required } else { hit_count >= required @@ -321,14 +332,25 @@ impl FilterProcessor { } } - fn should_keep_sequence(&mut self, seq: &[u8]) -> (bool, usize, usize, Vec) { - if seq.len() < self.kmer_length as usize { - return (self.deplete, 0, 0, Vec::new()); // If too short, keep if in deplete mode + fn should_keep_sequence( + seq: &[u8], + filter_buffers: &mut FilterBuffers, + kmer_length: u8, + window_size: u8, + deplete: bool, + prefix_length: usize, + minimizer_hashes: Arc>, + debug: bool, + abs_threshold: usize, + rel_threshold: f64, + ) -> (bool, usize, usize, Vec) { + if seq.len() < kmer_length as usize { + return (deplete, 0, 0, Vec::new()); // If too short, keep if in deplete mode } // Apply prefix length limit if specified - let effective_seq = if self.prefix_length > 0 && seq.len() > self.prefix_length { - &seq[..self.prefix_length] + let effective_seq = if prefix_length > 0 && seq.len() > prefix_length { + &seq[..prefix_length] } else { seq }; @@ -341,7 +363,7 @@ impl FilterProcessor { invalid_mask, positions, minimizer_values, - } = &mut self.filter_buffers; + } = filter_buffers; packed_seq.clear(); minimizer_values.clear(); @@ -376,15 +398,15 @@ impl FilterProcessor { // let mut positions = Vec::new(); simd_minimizers::canonical_minimizer_positions( packed_seq.as_slice(), - self.kmer_length as usize, - self.window_size as usize, + kmer_length as usize, + window_size as usize, positions, ); assert!( - self.kmer_length <= 56, + kmer_length <= 56, "Indexing the bitmask of invalid characters requires k<=56, but it is {}", - self.kmer_length + kmer_length ); // Filter positions to only include k-mers with ACGT bases @@ -392,7 +414,7 @@ impl FilterProcessor { // Extract bits pos .. pos+k from the bitmask. // mask of k ones in low positions. - let mask = u64::MAX >> (64 - self.kmer_length); + let mask = u64::MAX >> (64 - kmer_length); let byte = pos as usize / 8; let offset = pos as usize % 8; // The unaligned u64 read is OK, because we ensure that the underlying `Vec` always @@ -406,7 +428,7 @@ impl FilterProcessor { minimizer_values.extend( simd_minimizers::iter_canonical_minimizer_values( packed_seq.as_slice(), - self.kmer_length as usize, + kmer_length as usize, positions, ) .map(|kmer| xxhash_rust::xxh3::xxh3_64(&kmer.to_le_bytes())), @@ -420,19 +442,25 @@ impl FilterProcessor { let mut hit_kmers = Vec::new(); for (i, &hash) in minimizer_values.iter().enumerate() { - if self.minimizer_hashes.contains(&hash) && seen_hits.insert(hash) { + if minimizer_hashes.contains(&hash) && seen_hits.insert(hash) { hit_count += 1; // Extract the k-mer sequence at this position - if self.debug && i < positions.len() { + if debug && i < positions.len() { let pos = positions[i] as usize; - let kmer = &effective_seq[pos..pos + self.kmer_length as usize]; + let kmer = &effective_seq[pos..pos + kmer_length as usize]; hit_kmers.push(String::from_utf8_lossy(kmer).to_string()); } } } ( - self.meets_filtering_criteria(hit_count, num_minimizers), + Self::meets_filtering_criteria( + hit_count, + num_minimizers, + deplete, + abs_threshold, + rel_threshold, + ), hit_count, num_minimizers, hit_kmers, @@ -542,7 +570,13 @@ impl FilterProcessor { let total_minimizers = all_hashes.len(); ( - self.meets_filtering_criteria(pair_hit_count, total_minimizers), + Self::meets_filtering_criteria( + pair_hit_count, + total_minimizers, + self.deplete, + self.abs_threshold, + self.rel_threshold, + ), pair_hit_count, total_minimizers, hit_kmers, @@ -613,7 +647,19 @@ impl ParallelProcessor for FilterProcessor { self.local_stats.total_seqs += 1; self.local_stats.total_bp += seq.len() as u64; - let (should_keep, hit_count, total_minimizers, hit_kmers) = self.should_keep_sequence(&seq); + let filter_buffers = &mut self.filter_buffers; + let (should_keep, hit_count, total_minimizers, hit_kmers) = Self::should_keep_sequence( + &seq, + filter_buffers, + self.kmer_length, + self.window_size, + self.deplete, + self.prefix_length, + self.minimizer_hashes.clone(), + self.debug, + self.abs_threshold, + self.rel_threshold, + ); // Show debug info for sequences with hits if self.debug { From ffcee8630ba767b0962912cf988f3764edf84fb0 Mon Sep 17 00:00:00 2001 From: Noam Teyssier <22600644+noamteyssier@users.noreply.github.com> Date: Sat, 16 Aug 2025 12:24:58 -0700 Subject: [PATCH 4/4] refactor(filter): pass in reference to minimizer_hashes instead of cloning arc --- src/filter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/filter.rs b/src/filter.rs index 79d080c..3c74895 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -339,7 +339,7 @@ impl FilterProcessor { window_size: u8, deplete: bool, prefix_length: usize, - minimizer_hashes: Arc>, + minimizer_hashes: &FxHashSet, debug: bool, abs_threshold: usize, rel_threshold: f64, @@ -655,7 +655,7 @@ impl ParallelProcessor for FilterProcessor { self.window_size, self.deplete, self.prefix_length, - self.minimizer_hashes.clone(), + &self.minimizer_hashes, self.debug, self.abs_threshold, self.rel_threshold, @@ -906,7 +906,7 @@ impl binseq::ParallelProcessor for FilterProcessor { self.window_size, self.deplete, self.prefix_length, - self.minimizer_hashes.clone(), + &self.minimizer_hashes, self.debug, self.abs_threshold, self.rel_threshold,