diff --git a/Cargo.lock b/Cargo.lock index effb019..17c9895 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1729,7 +1729,7 @@ dependencies = [ "rand 0.9.0", "rand_distr 0.5.1", "rayon", - "single-utilities", + "single-utilities 0.6.0", "thiserror 2.0.9", ] @@ -1743,8 +1743,17 @@ dependencies = [ ] [[package]] -name = "single_algebra" +name = "single-utilities" version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd8b90e8aa6128de9b26903484c2cee7556cf342b190ea3570396e293d1991de" +dependencies = [ + "num-traits", +] + +[[package]] +name = "single_algebra" +version = "0.8.3" dependencies = [ "anyhow", "approx", @@ -1761,7 +1770,7 @@ dependencies = [ "rayon", "simba", "single-svdlib", - "single-utilities", + "single-utilities 0.7.0", "smartcore", ] diff --git a/Cargo.toml b/Cargo.toml index 7d0085a..d9d9c20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "single_algebra" -version = "0.7.0" +version = "0.8.3" edition = "2021" license-file = "LICENSE.md" description = "A linear algebra convenience library for the single-rust library. Can be used externally as well." @@ -37,7 +37,7 @@ simba = { version = "0.9.0", optional = true } smartcore = { version = "0.4", features = ["ndarray-bindings"], optional = true } single-svdlib = { version = "1.0.4" } rand = "0.9.0" -single-utilities = "0.6.0" +single-utilities = "0.7.0" [dev-dependencies] criterion = "0.5.1" diff --git a/src/sparse/csc.rs b/src/sparse/csc.rs index 2b27a3b..937f863 100644 --- a/src/sparse/csc.rs +++ b/src/sparse/csc.rs @@ -2,11 +2,10 @@ use nalgebra_sparse::CscMatrix; use num_traits::{Float, NumCast, PrimInt, Unsigned, Zero}; use single_utilities::types::Direction; use std::collections::{HashMap, HashSet}; -use std::hash::Hash; use std::iter::Sum; -use std::ops::Add; use std::ops::AddAssign; +use crate::sparse::MatrixNTop; use crate::utils::Normalize; use super::{ @@ -359,8 +358,8 @@ where fn var_col(&self) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + std::iter::Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, Self::Item: NumCast, { let sum: Vec = self.sum_col()?; @@ -385,8 +384,8 @@ where fn var_row(&self) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + std::iter::Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, Self::Item: NumCast, { let sum: Vec = self.sum_row()?; @@ -410,8 +409,8 @@ where fn var_col_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + std::iter::Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, Self::Item: NumCast, { // Validate input slice length matches number of columns @@ -449,8 +448,8 @@ where fn var_row_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + std::iter::Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, Self::Item: NumCast, { // Validate input slice length matches number of rows @@ -488,8 +487,8 @@ where fn var_col_masked(&self, mask: &[bool]) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + Sum + Send + Sync, { // Validate mask length if mask.len() < self.nrows() { @@ -537,8 +536,8 @@ where fn var_row_masked(&self, mask: &[bool]) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + Sum + Send + Sync { // Validate mask length if mask.len() < self.ncols() { @@ -590,7 +589,7 @@ impl MatrixMinMax for CscMatrix fn min_max_col(&self) -> anyhow::Result<(Vec, Vec)> where - Item: NumCast + Copy + PartialOrd + NumericOps, + Item: NumCast + Copy + PartialOrd + NumericOps + Send + Sync, { let mut min: Vec = vec![Item::max_value(); self.ncols()]; let mut max: Vec = vec![Item::min_value(); self.ncols()]; @@ -601,7 +600,7 @@ impl MatrixMinMax for CscMatrix fn min_max_row(&self) -> anyhow::Result<(Vec, Vec)> where - Item: NumCast + Copy + PartialOrd + NumericOps, + Item: NumCast + Copy + PartialOrd + NumericOps + Send + Sync, { let mut min: Vec = vec![Item::max_value(); self.nrows()]; let mut max: Vec = vec![Item::min_value(); self.nrows()]; @@ -1027,6 +1026,41 @@ impl BatchMatrixMean for CscMatrix { } } +impl MatrixNTop for CscMatrix { + type Item = M; + + fn sum_row_n_top(&self, n: usize) -> anyhow::Result> + where + T: Float + NumCast + AddAssign + Sum { + let mut result = vec![T::zero(); self.nrows()]; + + let mut row_values: Vec> = vec![Vec::new(); self.nrows()]; + + for col_idx in 0..self.ncols() { + let col_start = self.col_offsets()[col_idx]; + let col_end = self.col_offsets()[col_idx + 1]; + + for idx in col_start..col_end { + let row_idx = self.row_indices()[idx]; + if let Some(val) = T::from(self.values()[idx]) { + row_values[row_idx].push(val); + } + } + } + + for (row_idx, mut values) in row_values.into_iter().enumerate() { + if values.len() <= n { + result[row_idx] = values.into_iter().sum(); + } else { + values.sort_by(|a, b| b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal)); + result[row_idx] = values.into_iter().take(n).sum(); + } + } + + Ok(result) + } +} + #[cfg(test)] mod tests { use Direction; diff --git a/src/sparse/csr.rs b/src/sparse/csr.rs index b67b9f0..6cb070a 100644 --- a/src/sparse/csr.rs +++ b/src/sparse/csr.rs @@ -1,49 +1,125 @@ use std::collections::HashMap; -use std::hash::Hash; use std::iter::Sum; -use std::ops::{Add, AddAssign}; +use std::ops::AddAssign; +use std::sync::Mutex; use super::{ BatchMatrixMean, BatchMatrixVariance, MatrixMinMax, MatrixNonZero, MatrixSum, MatrixVariance, }; +use crate::sparse::MatrixNTop; use crate::utils::Normalize; use crate::utils::{BatchIdentifier, Log1P}; use anyhow::{anyhow, Ok}; use nalgebra_sparse::CsrMatrix; -use num_traits::{Float, NumCast, One, PrimInt, Unsigned, Zero}; +use num_traits::{Float, NumCast, PrimInt, Unsigned, Zero}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; +use rayon::slice::ParallelSlice; use single_utilities::traits::{FloatOpsTS, NumericOps}; use single_utilities::types::Direction; +const PARALLEL_THRESHOLD: usize = 200_000; +const CHUNK_SIZE: usize = 512; + impl MatrixNonZero for CsrMatrix { fn nonzero_col(&self) -> anyhow::Result> where - T: PrimInt + Unsigned + Zero + AddAssign, + T: PrimInt + Unsigned + Zero + AddAssign + Send + Sync, { - let mut result = vec![T::zero(); self.ncols()]; - for &col_index in self.col_indices() { - result[col_index] += T::one(); + let n_cols = self.ncols(); + let col_indices = self.col_indices(); + let total_nnz = col_indices.len(); + + // Early return for empty matrix + if total_nnz == 0 || n_cols == 0 { + return Ok(vec![T::zero(); n_cols]); + } + + if let Some(&max_col) = col_indices.iter().max() { + if max_col >= n_cols { + return Err(anyhow::anyhow!( + "Invalid column index {} exceeds matrix column count {}", + max_col, + n_cols + )); + } + } + + if total_nnz < PARALLEL_THRESHOLD { + // Sequential implementation + let mut result = vec![T::zero(); n_cols]; + + for &col_idx in col_indices { + result[col_idx] += T::one(); + } + + Ok(result) + } else { + let result = col_indices + .par_chunks(8192) + .map(|chunk| { + let mut local_counts = vec![T::zero(); n_cols]; + for &col_idx in chunk { + local_counts[col_idx] += T::one(); + } + local_counts + }) + .reduce( + || vec![T::zero(); n_cols], + |mut acc, local| { + for (i, count) in local.into_iter().enumerate() { + acc[i] += count; + } + acc + }, + ); + + Ok(result) } - Ok(result) } fn nonzero_row(&self) -> anyhow::Result> where - T: PrimInt + Unsigned + Zero + AddAssign, + T: PrimInt + Unsigned + Zero + AddAssign + Send + Sync, { - let data = self - .row_offsets() - .windows(2) - .map(|window| { - let diff = window[1] - .checked_sub(window[0]) - .ok_or_else(|| anyhow!("Subtraction overflow")) - .expect("Subtraction overflow"); - T::from(diff) - .ok_or_else(|| anyhow!("Failed to convert to target type")) - .expect("Failed to convert to target type") - }) - .collect(); - Ok(data) + let row_offsets = self.row_offsets(); + let n_rows = self.nrows(); + + if n_rows == 0 { + return Ok(Vec::new()); + } + + if row_offsets.len() != n_rows + 1 { + return Err(anyhow::anyhow!( + "Invalid row offsets: expected {} elements, got {}", + n_rows + 1, + row_offsets.len() + )); + } + + if n_rows < PARALLEL_THRESHOLD { + let mut result = Vec::with_capacity(n_rows); + + for row in 0..n_rows { + let count = row_offsets[row + 1] - row_offsets[row]; + result.push(T::from(count).ok_or_else(|| { + anyhow::anyhow!("Count {} exceeds target type capacity", count) + })?); + } + + Ok(result) + } else { + let result: Result, anyhow::Error> = (0..n_rows) + .into_par_iter() + .map(|row| { + let count = row_offsets[row + 1] - row_offsets[row]; + T::from(count).ok_or_else(|| { + anyhow::anyhow!("Count {} exceeds target type capacity", count) + }) + }) + .collect(); + + result + } } fn nonzero_col_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> @@ -112,9 +188,8 @@ impl MatrixNonZero for CsrMatrix { fn nonzero_row_masked(&self, mask: &[bool]) -> anyhow::Result> where - T: PrimInt + Unsigned + Zero + AddAssign, + T: PrimInt + Unsigned + Zero + AddAssign + Send + Sync, { - // Validate mask length if mask.len() < self.ncols() { return Err(anyhow::anyhow!( "Mask length ({}) is less than number of columns ({})", @@ -123,111 +198,198 @@ impl MatrixNonZero for CsrMatrix { )); } - let mut result = vec![T::zero(); self.nrows()]; + let n_rows = self.nrows(); + let row_offsets = self.row_offsets(); + let col_indices = self.col_indices(); - // Process each row - for row in 0..self.nrows() { - let row_start = self.row_offsets()[row]; - let row_end = self.row_offsets()[row + 1]; + if n_rows == 0 { + return Ok(Vec::new()); + } - // Count non-zero elements in this row that are in masked-in columns - for idx in row_start..row_end { - let col = self.col_indices()[idx]; + let masked_count = mask.iter().filter(|&&m| m).count(); + if masked_count == 0 { + return Ok(vec![T::zero(); n_rows]); + } + + let total_nnz = self.nnz(); + + if total_nnz < PARALLEL_THRESHOLD { + let mut result = Vec::with_capacity(n_rows); + + for row in 0..n_rows { + let start = row_offsets[row]; + let end = row_offsets[row + 1]; + let mut count = T::zero(); - // Skip this column if masked out - if !mask[col] { - continue; + for idx in start..end { + if mask[col_indices[idx]] { + count += T::one(); + } } - result[row] += T::one(); + result.push(count); } - } - Ok(result) + Ok(result) + } else { + let counts: Vec = (0..n_rows) + .into_par_iter() + .map(|row| { + let start = row_offsets[row]; + let end = row_offsets[row + 1]; + let mut count = T::zero(); + + for idx in start..end { + if mask[col_indices[idx]] { + count += T::one(); + } + } + + count + }) + .collect(); + + Ok(counts) + } } } -impl MatrixSum for CsrMatrix { +impl MatrixSum for CsrMatrix { type Item = M; fn sum_col(&self) -> anyhow::Result> where - T: Float + NumCast + AddAssign + std::iter::Sum, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, Self::Item: NumCast, { - let mut result = vec![T::zero(); self.ncols()]; + let n_cols = self.ncols(); let col_indices = self.col_indices(); let values = self.values(); + let total_nnz = values.len(); - // Process values directly in chunks to better utilize cache - const CHUNK_SIZE: usize = 256; - for chunk_start in (0..values.len()).step_by(CHUNK_SIZE) { - let chunk_end = (chunk_start + CHUNK_SIZE).min(values.len()); + if total_nnz == 0 || n_cols == 0 { + return Ok(vec![T::zero(); n_cols]); + } - // Direct accumulation without temporary storage - for (&col_idx, &value) in col_indices[chunk_start..chunk_end] - .iter() - .zip(&values[chunk_start..chunk_end]) - { - result[col_idx] += T::from(value).unwrap(); + if total_nnz < PARALLEL_THRESHOLD { + let mut result = vec![T::zero(); n_cols]; + + for chunk_start in (0..total_nnz).step_by(CHUNK_SIZE) { + let chunk_end = (chunk_start + CHUNK_SIZE).min(total_nnz); + + for idx in chunk_start..chunk_end { + result[col_indices[idx]] += T::from(values[idx]).unwrap(); + } } - } - Ok(result) + Ok(result) + } else { + let result = (0..total_nnz) + .into_par_iter() + .chunks(8192) + .map(|chunk_indices| { + let mut local_sums = vec![T::zero(); n_cols]; + + for idx in chunk_indices { + if idx < total_nnz { + local_sums[col_indices[idx]] += T::from(values[idx]).unwrap(); + } + } + + local_sums + }) + .reduce( + || vec![T::zero(); n_cols], + |mut acc, local| { + for (i, val) in local.into_iter().enumerate() { + acc[i] += val; + } + acc + }, + ); + + Ok(result) + } } fn sum_row(&self) -> anyhow::Result> where - T: Float + NumCast + AddAssign + std::iter::Sum, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, Self::Item: NumCast, { let nrows = self.nrows(); - let mut result = vec![T::zero(); nrows]; let values = self.values(); let row_offsets = self.row_offsets(); - // Process in chunks of 4 rows when possible - let chunk_size = 4; - let chunks = nrows / chunk_size; - let remainder = nrows % chunk_size; + if nrows == 0 { + return Ok(Vec::new()); + } - // Process chunks - for chunk in 0..chunks { - let base = chunk * chunk_size; - let mut sums = [M::zero(); 4]; + let total_nnz = values.len(); - // Process 4 rows at once to improve instruction-level parallelism - (0..chunk_size).enumerate().for_each(|(i, offset)| { - let row = base + offset; - let start = row_offsets[row]; - let end = row_offsets[row + 1]; + if total_nnz < PARALLEL_THRESHOLD { + let mut result = Vec::with_capacity(nrows); - // Direct sum in original type - for &val in &values[start..end] { - sums[i] += val; - } - }); + const ROW_CHUNK: usize = 64; - // Convert results for the chunk - sums.iter().enumerate().for_each(|(i, &sum)| { - result[base + i] = T::from(sum).unwrap(); - }); - } + for row_chunk in (0..nrows).step_by(ROW_CHUNK) { + let chunk_end = (row_chunk + ROW_CHUNK).min(nrows); - // Handle remaining rows - let base = chunks * chunk_size; - for row in base..nrows { - let start = row_offsets[row]; - let end = row_offsets[row + 1]; - let mut sum = M::zero(); + for row in row_chunk..chunk_end { + let start = row_offsets[row]; + let end = row_offsets[row + 1]; + let row_values = &values[start..end]; + let sum = if row_values.len() < 16 { + row_values.iter().map(|&v| T::from(v).unwrap()).sum::() + } else { + let mut sum = T::zero(); + let chunks = row_values.chunks_exact(4); + let remainder = chunks.remainder(); - for &val in &values[start..end] { - sum += val; + for chunk in chunks { + sum += T::from(chunk[0]).unwrap(); + sum += T::from(chunk[1]).unwrap(); + sum += T::from(chunk[2]).unwrap(); + sum += T::from(chunk[3]).unwrap(); + } + + for &val in remainder { + sum += T::from(val).unwrap(); + } + + sum + }; + + result.push(sum); + } } - result[row] = T::from(sum).unwrap(); - } - Ok(result) + Ok(result) + } else { + let sums: Vec = (0..nrows) + .into_par_iter() + .map(|row| { + let start = row_offsets[row]; + let end = row_offsets[row + 1]; + let row_values = &values[start..end]; + + if row_values.len() < 32 { + row_values.iter().map(|&v| T::from(v).unwrap()).sum::() + } else { + let mut sum = T::zero(); + + for chunk in row_values.chunks(8) { + let chunk_sum: T = chunk.iter().map(|&v| T::from(v).unwrap()).sum(); + sum += chunk_sum; + } + + sum + } + }) + .collect(); + + Ok(sums) + } } fn sum_col_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> @@ -256,9 +418,8 @@ impl MatrixSum for CsrMatrix { fn sum_col_masked(&self, mask: &[bool]) -> anyhow::Result> where - T: Float + NumCast + AddAssign + Sum, + T: Float + NumCast + AddAssign + Sum + Send + Sync, { - // Validate mask length if mask.len() < self.nrows() { return Err(anyhow::anyhow!( "Mask length ({}) is less than number of rows ({})", @@ -267,34 +428,70 @@ impl MatrixSum for CsrMatrix { )); } - let mut result = vec![T::zero(); self.ncols()]; + let n_cols = self.ncols(); + let row_offsets = self.row_offsets(); + let col_indices = self.col_indices(); + let values = self.values(); - // Process each row - for row in 0..self.nrows() { - // Skip this row if masked out - if !mask[row] { - continue; - } + let masked_count = mask.iter().filter(|&&m| m).count(); - let row_start = self.row_offsets()[row]; - let row_end = self.row_offsets()[row + 1]; + if masked_count == 0 { + return Ok(vec![T::zero(); n_cols]); + } - // Process all non-zero elements in this row - for idx in row_start..row_end { - let col = self.col_indices()[idx]; - let value = T::from(self.values()[idx]).unwrap(); - result[col] += value; + if masked_count < PARALLEL_THRESHOLD { + let mut result = vec![T::zero(); n_cols]; + + for (row, &is_included) in mask.iter().enumerate() { + if is_included { + let start = row_offsets[row]; + let end = row_offsets[row + 1]; + + for idx in start..end { + result[col_indices[idx]] += T::from(values[idx]).unwrap(); + } + } } - } - Ok(result) + Ok(result) + } else { + let result = (0..mask.len()) + .into_par_iter() + .chunks(256) + .map(|row_chunk| { + let mut local_sums = vec![T::zero(); n_cols]; + + for row in row_chunk { + if mask[row] { + let start = row_offsets[row]; + let end = row_offsets[row + 1]; + + for idx in start..end { + local_sums[col_indices[idx]] += T::from(values[idx]).unwrap(); + } + } + } + + local_sums + }) + .reduce( + || vec![T::zero(); n_cols], + |mut acc, local| { + for (i, val) in local.into_iter().enumerate() { + acc[i] += val; + } + acc + }, + ); + + Ok(result) + } } fn sum_row_masked(&self, mask: &[bool]) -> anyhow::Result> where - T: Float + NumCast + AddAssign + Sum, + T: Float + NumCast + AddAssign + Sum + Send + Sync, { - // Validate mask length if mask.len() < self.ncols() { return Err(anyhow::anyhow!( "Mask length ({}) is less than number of columns ({})", @@ -303,28 +500,60 @@ impl MatrixSum for CsrMatrix { )); } - let mut result = vec![T::zero(); self.nrows()]; + let n_rows = self.nrows(); + let row_offsets = self.row_offsets(); + let col_indices = self.col_indices(); + let values = self.values(); - // Process each row - for row in 0..self.nrows() { - let row_start = self.row_offsets()[row]; - let row_end = self.row_offsets()[row + 1]; + if n_rows == 0 { + return Ok(Vec::new()); + } - // Process all non-zero elements in this row - for idx in row_start..row_end { - let col = self.col_indices()[idx]; + let masked_count = mask.iter().filter(|&&m| m).count(); + if masked_count == 0 { + return Ok(vec![T::zero(); n_rows]); + } + + let total_nnz = self.nnz(); + + if total_nnz < PARALLEL_THRESHOLD { + let mut result = Vec::with_capacity(n_rows); + + for row in 0..n_rows { + let start = row_offsets[row]; + let end = row_offsets[row + 1]; + let mut sum = T::zero(); - // Skip this column if masked out - if !mask[col] { - continue; + for idx in start..end { + if mask[col_indices[idx]] { + sum += T::from(values[idx]).unwrap(); + } } - let value = T::from(self.values()[idx]).unwrap(); - result[row] += value; + result.push(sum); } - } - Ok(result) + Ok(result) + } else { + let sums: Vec = (0..n_rows) + .into_par_iter() + .map(|row| { + let start = row_offsets[row]; + let end = row_offsets[row + 1]; + let mut sum = T::zero(); + + for idx in start..end { + if mask[col_indices[idx]] { + sum += T::from(values[idx]).unwrap(); + } + } + + sum + }) + .collect(); + + Ok(sums) + } } fn sum_col_squared(&self) -> anyhow::Result> @@ -366,7 +595,7 @@ where fn var_col(&self) -> anyhow::Result> where I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + Sum, + T: Float + NumCast + AddAssign + Sum + Send + Sync, { let sum: Vec = self.sum_col()?; let squared_sums: Vec = self.sum_col_squared()?; @@ -391,7 +620,7 @@ where fn var_row(&self) -> anyhow::Result> where I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + Sum, + T: Float + NumCast + AddAssign + Sum + Send + Sync, Self::Item: NumCast, { let sum: Vec = self.sum_row()?; @@ -416,8 +645,8 @@ where /// Calculate column-wise variance and store results in the provided slice fn var_col_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + Sum + Send + Sync, Self::Item: NumCast, { let ncols = self.ncols(); @@ -455,8 +684,8 @@ where fn var_row_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + Sum + Send + Sync, Self::Item: NumCast, { let nrows = self.nrows(); @@ -503,8 +732,8 @@ where fn var_col_masked(&self, mask: &[bool]) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + Sum + Send + Sync, { // Validate mask length if mask.len() < self.nrows() { @@ -552,8 +781,8 @@ where fn var_row_masked(&self, mask: &[bool]) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + Sum, + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + Sum + Send + Sync, { // Validate mask length if mask.len() < self.ncols() { @@ -605,7 +834,7 @@ impl MatrixMinMax for CsrMatrix fn min_max_col(&self) -> anyhow::Result<(Vec, Vec)> where - Item: NumCast + Copy + PartialOrd + NumericOps, + Item: NumCast + Copy + PartialOrd + NumericOps + Send + Sync, { let mut min: Vec = vec![Item::max_value(); self.ncols()]; let mut max: Vec = vec![Item::min_value(); self.ncols()]; @@ -616,7 +845,7 @@ impl MatrixMinMax for CsrMatrix fn min_max_row(&self) -> anyhow::Result<(Vec, Vec)> where - Item: NumCast + Copy + PartialOrd + NumericOps, + Item: NumCast + Copy + PartialOrd + NumericOps + Send + Sync, { let mut min: Vec = vec![Item::max_value(); self.nrows()]; let mut max: Vec = vec![Item::min_value(); self.nrows()]; @@ -627,7 +856,7 @@ impl MatrixMinMax for CsrMatrix fn min_max_col_chunk(&self, reference: (&mut [Item], &mut [Item])) -> anyhow::Result<()> where - Item: NumCast + Copy + PartialOrd + NumericOps, + Item: NumCast + Copy + PartialOrd + NumericOps + Send + Sync, { let (min_vals, max_vals) = reference; @@ -1033,6 +1262,37 @@ impl BatchMatrixMean for CsrMatrix { } } +impl MatrixNTop for CsrMatrix { + type Item = M; + + fn sum_row_n_top(&self, n: usize) -> anyhow::Result> + where + T: Float + NumCast + AddAssign + Sum, + { + let mut result = vec![T::zero(); self.nrows()]; + + for row_idx in 0..self.nrows() { + let row_start = self.row_offsets()[row_idx]; + let row_end = self.row_offsets()[row_idx + 1]; + + let mut row_values: Vec = Vec::new(); + for idx in row_start..row_end { + if let Some(val) = T::from(self.values()[idx]) { + row_values.push(val); + } + } + + if row_values.len() <= n { + result[row_idx] = row_values.into_iter().sum(); + } else { + row_values.sort_by(|a, b| b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal)); + result[row_idx] = row_values.into_iter().take(n).sum(); + } + } + Ok(result) + } +} + #[cfg(test)] mod tests { use Direction; diff --git a/src/sparse/mod.rs b/src/sparse/mod.rs index df39848..0dc2e8b 100644 --- a/src/sparse/mod.rs +++ b/src/sparse/mod.rs @@ -1,10 +1,9 @@ use std::collections::HashMap; -use std::hash::Hash; use std::ops::AddAssign; +use crate::utils::BatchIdentifier; use num_traits::{Float, NumCast, PrimInt, Unsigned, Zero}; use single_utilities::traits::NumericOps; -use crate::utils::BatchIdentifier; pub mod csc; pub mod csr; @@ -12,29 +11,29 @@ pub mod csr; pub trait MatrixNonZero { fn nonzero_col(&self) -> anyhow::Result> where - T: PrimInt + Unsigned + Zero + AddAssign; + T: PrimInt + Unsigned + Zero + AddAssign + Send + Sync; fn nonzero_row(&self) -> anyhow::Result> where - T: PrimInt + Unsigned + Zero + AddAssign; + T: PrimInt + Unsigned + Zero + AddAssign + Send + Sync; fn nonzero_col_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - T: PrimInt + Unsigned + Zero + AddAssign; + T: PrimInt + Unsigned + Zero + AddAssign + Send + Sync; fn nonzero_row_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - T: PrimInt + Unsigned + Zero + AddAssign; + T: PrimInt + Unsigned + Zero + AddAssign + Send + Sync; /// Calculate masked non-zero counts for columns fn nonzero_col_masked(&self, mask: &[bool]) -> anyhow::Result> where - T: PrimInt + Unsigned + Zero + AddAssign; + T: PrimInt + Unsigned + Zero + AddAssign + Send + Sync; /// Calculate masked non-zero counts for rows fn nonzero_row_masked(&self, mask: &[bool]) -> anyhow::Result> where - T: PrimInt + Unsigned + Zero + AddAssign; + T: PrimInt + Unsigned + Zero + AddAssign + Send + Sync; } pub trait MatrixSum { @@ -42,36 +41,36 @@ pub trait MatrixSum { fn sum_col(&self) -> anyhow::Result> where - T: Float + num_traits::NumCast + AddAssign + std::iter::Sum; + T: Float + num_traits::NumCast + AddAssign + std::iter::Sum + Send + Sync; fn sum_row(&self) -> anyhow::Result> where - T: Float + num_traits::NumCast + AddAssign + std::iter::Sum; + T: Float + num_traits::NumCast + AddAssign + std::iter::Sum + Send + Sync; fn sum_col_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - T: Float + num_traits::NumCast + AddAssign + std::iter::Sum; + T: Float + num_traits::NumCast + AddAssign + std::iter::Sum + Send + Sync; fn sum_row_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - T: Float + num_traits::NumCast + AddAssign + std::iter::Sum; + T: Float + num_traits::NumCast + AddAssign + std::iter::Sum + Send + Sync; fn sum_col_masked(&self, mask: &[bool]) -> anyhow::Result> where - T: Float + NumCast + AddAssign + std::iter::Sum; + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync; /// Calculate masked sum for rows fn sum_row_masked(&self, mask: &[bool]) -> anyhow::Result> where - T: Float + NumCast + AddAssign + std::iter::Sum; - - fn sum_col_squared(&self) -> anyhow::Result> + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync; + + fn sum_col_squared(&self) -> anyhow::Result> where - T: Float + NumCast + AddAssign + std::iter::Sum; + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync; fn sum_row_squared(&self) -> anyhow::Result> where - T: Float + NumCast + AddAssign + std::iter::Sum; + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync; } pub trait MatrixVariance { @@ -79,35 +78,35 @@ pub trait MatrixVariance { fn var_col(&self) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + num_traits::NumCast + AddAssign + std::iter::Sum; + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + num_traits::NumCast + AddAssign + std::iter::Sum + Send + Sync; fn var_row(&self) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + num_traits::NumCast + AddAssign + std::iter::Sum; + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + num_traits::NumCast + AddAssign + std::iter::Sum + Send + Sync; fn var_col_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + num_traits::NumCast + AddAssign + std::iter::Sum; + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + num_traits::NumCast + AddAssign + std::iter::Sum + Send + Sync; fn var_row_chunk(&self, reference: &mut [T]) -> anyhow::Result<()> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + num_traits::NumCast + AddAssign + std::iter::Sum; + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + num_traits::NumCast + AddAssign + std::iter::Sum + Send + Sync; /// Calculate masked variance for columns fn var_col_masked(&self, mask: &[bool]) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + std::iter::Sum; + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync; /// Calculate masked variance for rows fn var_row_masked(&self, mask: &[bool]) -> anyhow::Result> where - I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + std::iter::Sum; + I: PrimInt + Unsigned + Zero + AddAssign + Into + Send + Sync, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync; } pub trait MatrixMinMax { @@ -115,19 +114,19 @@ pub trait MatrixMinMax { fn min_max_col(&self) -> anyhow::Result<(Vec, Vec)> where - Item: NumCast + Copy + PartialOrd + NumericOps; + Item: NumCast + Copy + PartialOrd + NumericOps + Send + Sync; fn min_max_row(&self) -> anyhow::Result<(Vec, Vec)> where - Item: NumCast + Copy + PartialOrd + NumericOps; + Item: NumCast + Copy + PartialOrd + NumericOps + Send + Sync; fn min_max_col_chunk(&self, reference: (&mut [Item], &mut [Item])) -> anyhow::Result<()> where - Item: NumCast + Copy + PartialOrd + NumericOps; + Item: NumCast + Copy + PartialOrd + NumericOps + Send + Sync; fn min_max_row_chunk(&self, reference: (&mut [Item], &mut [Item])) -> anyhow::Result<()> where - Item: NumCast + Copy + PartialOrd + NumericOps; + Item: NumCast + Copy + PartialOrd + NumericOps + Send + Sync; } pub trait BatchMatrixVariance { @@ -137,14 +136,14 @@ pub trait BatchMatrixVariance { fn var_batch_row(&self, batches: &[B]) -> anyhow::Result>> where I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + std::iter::Sum, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, B: BatchIdentifier; /// Calculate column-wise variance for each batch fn var_batch_col(&self, batches: &[B]) -> anyhow::Result>> where I: PrimInt + Unsigned + Zero + AddAssign + Into, - T: Float + NumCast + AddAssign + std::iter::Sum, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, B: BatchIdentifier; } @@ -154,14 +153,20 @@ pub trait BatchMatrixMean { /// Calculate row-wise mean for each batch fn mean_batch_row(&self, batches: &[B]) -> anyhow::Result>> where - T: Float + NumCast + AddAssign + std::iter::Sum, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, B: BatchIdentifier; /// Calculate column-wise mean for each batch fn mean_batch_col(&self, batches: &[B]) -> anyhow::Result>> where - T: Float + NumCast + AddAssign + std::iter::Sum, + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync, B: BatchIdentifier; } +pub trait MatrixNTop { + type Item: NumCast; + fn sum_row_n_top(&self, n: usize) -> anyhow::Result> + where + T: Float + NumCast + AddAssign + std::iter::Sum + Send + Sync; +}