diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index 2aa138e..d5c8472 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -23,8 +23,8 @@ jobs: unit-tests: strategy: matrix: - os: [ubuntu-latest, macos-11] - python-version: ["3.7", "3.8", "3.9", "3.10"] + os: [ubuntu-latest] + python-version: ["3.10", "3.11", "3.12"] runs-on: ${{ matrix.os }} diff --git a/.gitignore b/.gitignore index 651f63c..236a0eb 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ *~ *__pycache__/ *egg-info/ +env/ diff --git a/blnm/fit.py b/blnm/fit.py index a252004..199e14c 100644 --- a/blnm/fit.py +++ b/blnm/fit.py @@ -1,6 +1,8 @@ """Fitting routines. """ import numpy as np +import numpy.typing as npt +import typing from scipy.stats import binom from scipy import special as scisp @@ -11,7 +13,7 @@ INTEGRAL_N_SAMPLES = 1000 -def _data_log_likelihood(zeroth_order): +def _data_log_likelihood(zeroth_order: npt.NDArray) -> float: """Compute the data log likelihood. Args: @@ -22,13 +24,17 @@ def _data_log_likelihood(zeroth_order): """ return np.sum(np.log(np.sum(zeroth_order, axis=0))) -def _e_step(x_counts, n_counts, - coefs, means, variance, k_mixtures, - zeroth_order, - first_order, - second_order, - integral_n_samples, - seed): +def _e_step(x_counts: npt.NDArray, + n_counts: npt.NDArray, + coefs: npt.NDArray, + means: npt.NDArray, + variance: float, + k_mixtures: int, + zeroth_order: npt.NDArray, + first_order: npt.NDArray, + second_order: npt.NDArray, + integral_n_samples: int, + seed: typing.Any) -> None: """E step of EM algorithm. Updates the conditional expectation for the zeroth, @@ -109,16 +115,22 @@ def _m_step(zeroth_order, return coefs, means, variance -# return (coefs, means, variance) -def blnm(x_counts, - n_counts, - k_mixtures, - seed=None, - tolerance=1e-6, - max_iter=1000, - disp = True, - integral_n_samples = INTEGRAL_N_SAMPLES): +def init_pars(): + raise NotImplementedError + + +def blnm(x_counts: npt.NDArray, + n_counts: npt.NDArray, + k_mixtures: int, + coefs: npt.NDArray | None = None, + means: npt.NDArray | None = None, + variance: float | None = None, + seed: None | int | np.random._generator.Generator = None, + tolerance:float = 1e-6, + max_iter:int = 1000, + disp: bool = True, + integral_n_samples: int = INTEGRAL_N_SAMPLES) -> dict: """Fit mixture of BLN models. Args: @@ -127,6 +139,17 @@ def blnm(x_counts, n_counts : ((N,) np.ndarray) alternative + reference allele specific expression counts k_mixtures : (int) > 0 number of mixtures to fit + coefs: ((k_mixtures,) np.ndarray) + The weights for each BLN probability mass funciton in the + mixture. Each coefficient must be greater than 0 and the + sum of coefficients be 1. If None, pick coefficients randomly + subject to our contraints. + means: ((k_mixtures,) np.ndarray) + The mean parameter for each BLN probability mass function. + This can be any real number. If None, pick coefficients randomly. + variance: (float) + A real number greater than zero representing the variance parameter + of each BLN probability mass function. seed : (any input to numpy random Generator object) tolerance : (float) criterion for convergence max_iter : (int) maximum number of interations @@ -170,13 +193,39 @@ def blnm(x_counts, rng = np.random.default_rng(seed=seed) # initialize parameters - coefs = rng.uniform(low=0.1, high=0.9, size=k_mixtures) - coefs = coefs / np.sum(coefs) - - p = rng.uniform(low=0.01, high=0.99, size=k_mixtures) - means = np.log(p / (1-p)) + if coefs is None: + coefs = rng.uniform(low=0.1, high=0.9, size=k_mixtures) + coefs = coefs / np.sum(coefs) + + # verify coefs + if (coefs < 0).any(): + raise ValueError("All coefficients must be positive.") + elif (np.isnan(coefs)).any(): + raise ValueError("All coefficients must be positive.") + elif (s := np.sum(coefs)) < 0.9999 or s > 1.0001: + raise ValueError("The sum of coefficients must be 1.") + elif coefs.size != k_mixtures: + raise ValueError("The number of coefficients must be" + " equal to the number of k_mixtures.") + + if means is None: + p = rng.uniform(low=0.01, high=0.99, size=k_mixtures) + means = np.log(p / (1-p)) + + if means.size != k_mixtures: + raise ValueError("The number of means must be" + " equal to the number of k_mixtures.") + elif (np.isnan(means)).any(): + raise ValueError("The number of means must be" + " equal to the number of k_mixtures.") + + + if variance is None: + variance = rng.uniform(low=0.1, high=3) + + if variance <= 0 or np.isnan(variance): + raise ValueError("Variance parameter must be a float greater than zero.") - variance = rng.uniform(low=0.1, high=3) # preallocate memory for arrays constructed in the E step # each array represents diff --git a/setup.cfg b/setup.cfg index a8db752..13f5b48 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,7 +10,7 @@ author = Robert Vogel [options] packages = blnm -python_requires = >= 3.7 +python_requires = >= 3.10 install_requires = numpy >= 1.14.0 scipy >= 1.0.0 diff --git a/test/test_fit.py b/test/test_fit.py new file mode 100644 index 0000000..7a44624 --- /dev/null +++ b/test/test_fit.py @@ -0,0 +1,99 @@ +from unittest import TestCase, main + +import numpy as np +from blnm import fit + + + +class TestInitPars(TestCase): + def test_not_implemented(self): + with self.assertRaises(NotImplementedError): + fit.init_pars() + + +class TestFitParCheck(TestCase): + def setUp(self): + rng = np.random.default_rng() + + self.n = 300 + self.k_mixtures = 4 + self.coefs = np.array([0.1,0.4,0.3,0.2]) + self.means = np.array([-2,-1,1,2], dtype=float) + self.variance = 1.1 + self.xcount = rng.choice(250, size=self.n, replace=True) + self.ncount = np.full((self.n,), 250) + + def test_correct_input(self): + fit.blnm(self.xcount, self.ncount, self.k_mixtures, + coefs=self.coefs, means=self.means, + variance=self.variance) + + def test_check_coefs_negative_val(self): + self.coefs[2] = -self.coefs[2] + + with self.assertRaises(ValueError): + fit.blnm(self.xcount, self.ncount, self.k_mixtures, + coefs=self.coefs, means=self.means, + variance=self.variance) + + def test_check_coefs_wrong_number(self): + self.coefs = self.coefs[0:2] + + with self.assertRaises(ValueError): + fit.blnm(self.xcount, self.ncount, self.k_mixtures, + coefs=self.coefs, means=self.means, + variance=self.variance) + + + def test_check_coefs_wrong_number(self): + self.coefs[2] = self.coefs[2] * 1.1 + + with self.assertRaises(ValueError): + fit.blnm(self.xcount, self.ncount, self.k_mixtures, + coefs=self.coefs, means=self.means, + variance=self.variance) + + def test_check_coefs_nan(self): + self.coefs[2] = np.nan + + with self.assertRaises(ValueError): + fit.blnm(self.xcount, self.ncount, self.k_mixtures, + coefs=self.coefs, means=self.means, + variance=self.variance) + + def test_check_means_wrong_number(self): + self.means = self.means[0:2] + + with self.assertRaises(ValueError): + fit.blnm(self.xcount, self.ncount, self.k_mixtures, + coefs=self.coefs, means=self.means, + variance=self.variance) + + + def test_check_means_wrong_number(self): + self.means[2] = np.nan + + with self.assertRaises(ValueError): + fit.blnm(self.xcount, self.ncount, self.k_mixtures, + coefs=self.coefs, means=self.means, + variance=self.variance) + + def test_check_variance(self): + self.variance = -1.2 + + with self.assertRaises(ValueError): + fit.blnm(self.xcount, self.ncount, self.k_mixtures, + coefs=self.coefs, means=self.means, + variance=self.variance) + + def test_check_nan(self): + self.variance = np.nan + + with self.assertRaises(ValueError): + fit.blnm(self.xcount, self.ncount, self.k_mixtures, + coefs=self.coefs, means=self.means, + variance=self.variance) + + +if __name__ == "__main__": + main()