diff --git a/.gitignore b/.gitignore index e2ec908..09f2ca5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ docs/_*/ build/ dist/ -.DS_Store \ No newline at end of file +.DS_Store +*.egg-info \ No newline at end of file diff --git a/dedenser/dedenser.py b/dedenser/dedenser.py index f494f3c..8b29aca 100644 --- a/dedenser/dedenser.py +++ b/dedenser/dedenser.py @@ -1,20 +1,21 @@ -"""Dedenser class for downsampling chemical point clouds. -""" +"""Dedenser class for downsampling chemical point clouds.""" + import os -os.environ["OMP_NUM_THREADS"] = "1" +import random +import alphashape +import matplotlib.pyplot as plt import numpy as np -import random -from sklearn.cluster import HDBSCAN +import point_cloud_utils as pcu from scipy.spatial import ConvexHull -from sklearn.mixture import GaussianMixture from scipy.spatial.distance import cdist -import point_cloud_utils as pcu -import alphashape -import matplotlib.pyplot as plt +from sklearn.cluster import HDBSCAN +from sklearn.mixture import GaussianMixture + +os.environ["OMP_NUM_THREADS"] = "1" -class Dedenser( ): +class Dedenser: """Class handles chemical point cloud downsampling. `Dedenser` takes a user defined target percent to @@ -28,9 +29,9 @@ class Dedenser( ): target : float, default=None Target percentage for the chemical point cloud to be downsampled to. - + random_seed : int, default=1 - Random seed to be used by point cloud utils. + Random seed to be used by point cloud utils. alpha : bool, default=False Determine if alphashapes (concave hulls) are used @@ -58,7 +59,7 @@ class Dedenser( ): as 0 (`True`). show : bool, default=False - When set to True, will display clusters (colored) and + When set to True, will display clusters (colored) and noise (black) after HDBSCAN clustering. Attributes @@ -98,26 +99,42 @@ class Dedenser( ): downsample() The main method that initiates the downsampling process. """ - def __init__(self,data=None, target=.5, random_seed=1, alpha=False, - min_size = 5, d_weight = None, v_weight = None, epsilon=0.0, - strict=False, show=False, GUI=False): + + def __init__( + self, + data=None, + target=0.5, + random_seed=1, + alpha=False, + min_size=5, + d_weight=None, + v_weight=None, + epsilon=0.0, + strict=False, + show=False, + GUI=False, + ): if target in (0, 1): - raise ValueError(f'Target can not be {target}.') + raise ValueError(f"Target can not be {target}.") if target > 1 or target < 0: - raise ValueError(f'Target must be between 0 & 1.\ - User provided {target}') + raise ValueError( + f"Target must be between 0 & 1.\ + User provided {target}" + ) if data.shape[1] != 3: if show: - raise ValueError('Visualization with high dimensional data is not supported.') + raise ValueError( + "Visualization with high dimensional data is not supported." + ) self.n_target = target * len(data) self.data = np.array(data) - #if not np.issubdtype(data.dtype, np.number): \\this isnt for kids anymore\\ + # if not np.issubdtype(data.dtype, np.number): \\this isnt for kids anymore\\ # raise ValueError("Data must contain numerics.") self.Strict = strict self.random_seed = random_seed self.alpha = alpha self.keep_list = [] - self.state = '' + self.state = "" self.clusters = [] self.vol = 0 self.epsilon = epsilon @@ -127,14 +144,16 @@ def __init__(self,data=None, target=.5, random_seed=1, alpha=False, self.d_weight = d_weight self.v_weight = v_weight self.Weighted = False - if self.v_weight != None or self.d_weight != None: + if self.v_weight is not None or self.d_weight is not None: self.Weighted = True self.Show = show self.GUI = GUI def start(self): """Performs clustering and calculates cluster volumes.""" - hdbscan = HDBSCAN(min_cluster_size=self.min_size,cluster_selection_epsilon=self.epsilon) + hdbscan = HDBSCAN( + min_cluster_size=self.min_size, cluster_selection_epsilon=self.epsilon + ) self.keep_list = list(self.keep_list) self.clusters = hdbscan.fit(self.data).labels_ c = 0 @@ -143,10 +162,12 @@ def start(self): self.keep_list.append(c) c += 1 self.keep_list = list(set(self.keep_list)) - for cluster in range(0, max(self.clusters)+1): - c_points = [self.data[i] for i, x in enumerate(self.clusters) if x == cluster] + for cluster in range(0, max(self.clusters) + 1): + c_points = [ + self.data[i] for i, x in enumerate(self.clusters) if x == cluster + ] if self.alpha: - #alpha = alphashape.optimizealpha(c_points) + # alpha = alphashape.optimizealpha(c_points) hull = alphashape.alphashape(c_points, 0.15) self.vol += abs(hull.volume) self.vols.append(abs(hull.volume)) @@ -157,32 +178,36 @@ def start(self): if self.Show: fig = plt.figure() - ax = fig.add_subplot(111, projection='3d') - ax.set_xlabel('UMAP_1') + ax = fig.add_subplot(111, projection="3d") + ax.set_xlabel("UMAP_1") ax.set_ylabel("UMAP_2") ax.set_zlabel("UMAP_3") - c_points = np.array([self.data[i] for i, x in enumerate(self.clusters) if x == -1]) + c_points = np.array( + [self.data[i] for i, x in enumerate(self.clusters) if x == -1] + ) if len(c_points) != 0: - ax.scatter(c_points[:, 0], c_points[:, 1], c_points[:, 2],s=.3,c='k') - for cluster in range(0, max(self.clusters)+1): - c_points = np.array([self.data[i] for i, x in enumerate(self.clusters) if x == cluster]) - ax.scatter(c_points[:, 0], c_points[:, 1], c_points[:, 2],s=.3) + ax.scatter(c_points[:, 0], c_points[:, 1], c_points[:, 2], s=0.3, c="k") + for cluster in range(0, max(self.clusters) + 1): + c_points = np.array( + [self.data[i] for i, x in enumerate(self.clusters) if x == cluster] + ) + ax.scatter(c_points[:, 0], c_points[:, 1], c_points[:, 2], s=0.3) plt.tight_layout() - #plt.savefig('data/initial_cloud.svg',dpi=1000) + # plt.savefig('data/initial_cloud.svg',dpi=1000) plt.show() - print('done start') + print("done start") def check_state(self): """Cheks the state after using HDBSCAN. - + Downsamples the entire chemical cloud if HDBSCAN selected too much noise given the target or continues with the dedensing algorithm otherwise. """ - if self.n_target*.95 < len(self.keep_list):#can make a variable threshold + if self.n_target * 0.95 < len(self.keep_list): # can make a variable threshold - self.state = 'noisy' + self.state = "noisy" else: - self.state = 'thinner' + self.state = "thinner" self.r_target = self.n_target - len(self.keep_list) def thinner(self): @@ -192,36 +217,45 @@ def thinner(self): Performs downsampling using Poisson disk sampling for larger clusters. """ c_lens, c_pointses, c_is, cds = [], [], [], [] - for cluster in range(0,max(self.clusters)+1): + for cluster in range(0, max(self.clusters) + 1): c_index = [i for i, x in enumerate(self.clusters) if x == cluster] c_is.append(c_index) c_lens.append(len(c_index)) - c_points = [self.data[i] for i, x in enumerate(self.clusters) if x == cluster] + c_points = [ + self.data[i] for i, x in enumerate(self.clusters) if x == cluster + ] cpl = [] for point in c_points: cpl.append(list(point)) c_pointses.append(cpl) - cds.append(len(c_index)/self.vols[cluster]) - #print(np.mean(np.array(c_points)[:,0]),np.mean(np.array(c_points)[:,1]),np.mean(np.array(c_points)[:,2])) - #print(len(c_index)/self.vols[cluster], self.vols[cluster], len(c_index)) - #estimate how many points to be removed per cluster + cds.append(len(c_index) / self.vols[cluster]) + # print(np.mean(np.array(c_points)[:,0]),np.mean(np.array(c_points)[:,1]),np.mean(np.array(c_points)[:,2])) + # print(len(c_index)/self.vols[cluster], self.vols[cluster], len(c_index)) + # estimate how many points to be removed per cluster Check_Targs = True while Check_Targs: - Check_Targs, c_is, c_pointses, cds = self.manage_targets(c_is,c_pointses,cds) + Check_Targs, c_is, c_pointses, cds = self.manage_targets( + c_is, c_pointses, cds + ) o_targ = 0 - for cvol, cind, points, cd in zip(self.vols,c_is,c_pointses,cds): - targ = round(self.r_target*cvol/sum(self.vols)) + for cvol, cind, points, cd in zip(self.vols, c_is, c_pointses, cds): + targ = round(self.r_target * cvol / sum(self.vols)) if self.Weighted: - if self.d_weight != None: + if self.d_weight is not None: w = self.d_weight - d1 = np.exp(w * (cd/sum(cds) - 1)) - dt = sum([np.exp(w * (cdn/sum(cds) - 1)) for cdn in cds]) - targ = round(self.r_target*(d1/dt)) + d1 = np.exp(w * (cd / sum(cds) - 1)) + dt = sum([np.exp(w * (cdn / sum(cds) - 1)) for cdn in cds]) + targ = round(self.r_target * (d1 / dt)) else: w = self.v_weight - v1 = np.exp(w * (cvol/sum(self.vols) - 1)) - vt = sum([np.exp(w * (cvols/sum(self.vols) - 1)) for cvols in self.vols]) - targ = round(self.r_target*(v1/vt)) + v1 = np.exp(w * (cvol / sum(self.vols) - 1)) + vt = sum( + [ + np.exp(w * (cvols / sum(self.vols) - 1)) + for cvols in self.vols + ] + ) + targ = round(self.r_target * (v1 / vt)) if targ == 0: pass if targ < 15: @@ -233,13 +267,13 @@ def thinner(self): targ = 1 gmm = GaussianMixture(n_components=1, random_state=0) gmm.fit(points) - #get the centroid of the fitted GMM + # get the centroid of the fitted GMM centroid = gmm.means_[0] center = np.argmin(cdist(points, [centroid])) if targ == 1: inds = [center] elif targ != 0: - inds = self.choice(cind,targ) + inds = self.choice(cind, targ) if not (center in inds): ri = random.randint(0, len(inds) - 1) inds = list(inds) @@ -251,35 +285,45 @@ def thinner(self): self.keep_list.append(cind[ind]) elif targ != 0: target_radius = -1 - idx = pcu.downsample_point_cloud_poisson_disk(np.array(points), target_radius,targ, - random_seed=self.random_seed, - sample_num_tolerance=0.01) + idx = pcu.downsample_point_cloud_poisson_disk( + np.array(points), + target_radius, + targ, + random_seed=self.random_seed, + sample_num_tolerance=0.01, + ) self.keep_list = list(self.keep_list) for ind in idx: self.keep_list.append(cind[ind]) self.keep_list = np.array(self.keep_list) - def manage_targets(self,c_is,c_pointses,cds): + def manage_targets(self, c_is, c_pointses, cds): """Checks for clusters with fewer points than their target value. Updates the remaining targets and clusters by keeping all points from clusters - with fewer points than their target, and removing them from the running list of clusters. + with fewer points than their target, and removing them from the running list + of clusters. """ New_Targs = False n_vols, n_is, n_pointses, n_cds = [], [], [], [] - for cvol, cind, points, cd in zip(self.vols,c_is,c_pointses,cds): - targ = round(self.r_target*cvol/sum(self.vols)) + for cvol, cind, points, cd in zip(self.vols, c_is, c_pointses, cds): + targ = round(self.r_target * cvol / sum(self.vols)) if self.Weighted: - if self.d_weight != None: + if self.d_weight is not None: w = self.d_weight - d1 = np.exp(w * (cd/sum(cds) - 1)) - dt = sum([np.exp(w * (cdn/sum(cds) - 1)) for cdn in cds]) - targ = round(self.r_target*(d1/dt)) + d1 = np.exp(w * (cd / sum(cds) - 1)) + dt = sum([np.exp(w * (cdn / sum(cds) - 1)) for cdn in cds]) + targ = round(self.r_target * (d1 / dt)) else: w = self.v_weight - v1 = np.exp(w * (cvol/sum(self.vols) - 1)) - vt = sum([np.exp(w * (cvols/sum(self.vols) - 1)) for cvols in self.vols]) - targ = round(self.r_target*(v1/vt)) + v1 = np.exp(w * (cvol / sum(self.vols) - 1)) + vt = sum( + [ + np.exp(w * (cvols / sum(self.vols) - 1)) + for cvols in self.vols + ] + ) + targ = round(self.r_target * (v1 / vt)) if targ > len(cind): New_Targs = True self.keep_list = list(self.keep_list) @@ -298,20 +342,18 @@ def manage_targets(self,c_is,c_pointses,cds): cds = n_cds self.r_target = self.n_target - len(self.keep_list) return New_Targs, c_is, c_pointses, cds - - def choice(self,indexs,targ): - """Helper method for numpy randomchoice for the sake of readability. - """ - return np.random.choice(len(indexs),targ) + def choice(self, indexs, targ): + """Helper method for numpy randomchoice for the sake of readability.""" + return np.random.choice(len(indexs), targ) def downsample(self): """The main method that initiates and runs the downsampling process. Calls the `start()` method to perform clustering and calculate cluster volumes and then checks the state of the downsampling process with `check_state()`. - If the state is `'noisy'`, the noise from HDBSCAN is downsampled based on its volume - prior to the downsampling of clusters using the `thinner()` method. + If the state is `'noisy'`, the noise from HDBSCAN is downsampled based on its + volume prior to the downsampling of clusters using the `thinner()` method. If the state is `'thinner'`, performs downsampling using the `thinner()` method. After `thinner()` is run, it returns the list of point cloud indexes to be kept. @@ -324,16 +366,20 @@ def downsample(self): self.start() print(f"Target of {int(round(self.n_target))} molecules") self.check_state() - if self.state == 'noisy': + if self.state == "noisy": noise = self.data[np.array(self.keep_list)] if self.alpha: noise_v = abs(alphashape.alphashape(noise, 0.15).volume) else: noise_v = ConvexHull(noise).volume - no_targ = round(noise_v/(noise_v+sum(self.vols)) * self.n_target * .8) - idx = pcu.downsample_point_cloud_poisson_disk(np.array(noise), -1,no_targ, - random_seed=self.random_seed, - sample_num_tolerance=0.01) + no_targ = round(noise_v / (noise_v + sum(self.vols)) * self.n_target * 0.8) + idx = pcu.downsample_point_cloud_poisson_disk( + np.array(noise), + -1, + no_targ, + random_seed=self.random_seed, + sample_num_tolerance=0.01, + ) self.keep_list = np.array(self.keep_list) self.keep_list = self.keep_list[np.array(idx)] self.keep_list = list(set(self.keep_list)) @@ -342,21 +388,33 @@ def downsample(self): print("Downsampled to {0} molecules".format(len(self.keep_list))) if self.Show: fig = plt.figure() - ax = fig.add_subplot(111, projection='3d') - ax.set_xlabel('UMAP_1') + ax = fig.add_subplot(111, projection="3d") + ax.set_xlabel("UMAP_1") ax.set_ylabel("UMAP_2") ax.set_zlabel("UMAP_3") - c_points = np.array([self.data[i] for i, x in enumerate(self.clusters) if x == -1 and i in self.keep_list]) + c_points = np.array( + [ + self.data[i] + for i, x in enumerate(self.clusters) + if x == -1 and i in self.keep_list + ] + ) if len(c_points) != 0: - ax.scatter(c_points[:, 0], c_points[:, 1], c_points[:, 2],s=.3,c='k') - for cluster in range(0, max(self.clusters)+1): - c_points = np.array([self.data[i] for i, x in enumerate(self.clusters) if x == cluster and i in self.keep_list]) + ax.scatter(c_points[:, 0], c_points[:, 1], c_points[:, 2], s=0.3, c="k") + for cluster in range(0, max(self.clusters) + 1): + c_points = np.array( + [ + self.data[i] + for i, x in enumerate(self.clusters) + if x == cluster and i in self.keep_list + ] + ) if len(c_points) != 0: - ax.scatter(c_points[:, 0], c_points[:, 1], c_points[:, 2],s=.3) + ax.scatter(c_points[:, 0], c_points[:, 1], c_points[:, 2], s=0.3) else: - ax.scatter([], [], [],s=.3) + ax.scatter([], [], [], s=0.3) plt.tight_layout() - #plt.savefig('data/downsampled_cloud.tif',dpi=1000) + # plt.savefig('data/downsampled_cloud.tif',dpi=1000) plt.show() if self.GUI: return list(set(self.keep_list)), self.clusters diff --git a/dedenser/mcloud.py b/dedenser/mcloud.py index d5ff9d5..8c3495f 100644 --- a/dedenser/mcloud.py +++ b/dedenser/mcloud.py @@ -1,16 +1,16 @@ -"""Utils for making chemical point clouds, visualization, and saving results. -""" +"""Utils for making chemical point clouds, visualization, and saving results.""" import pandas as pd import numpy as np + def drop_non_numeric_columns(df): """ Drop non-numeric columns from a DataFrame. Arguments --------- - df : pd.DataFrame + df : pd.DataFrame The input DataFrame. Returns @@ -22,14 +22,15 @@ def drop_non_numeric_columns(df): # Iterate over each column in the DataFrame for column in df.columns: # Check if the column contains any non-numeric values - if pd.to_numeric(df[column], errors='coerce').isnull().any(): + if pd.to_numeric(df[column], errors="coerce").isnull().any(): non_numeric_columns.append(column) # Drop the non-numeric columns from the DataFrame df_out = df.drop(columns=non_numeric_columns) return df_out -def make_cloud(path, path_out='cloud_out', sep=',', position=0, exl=False, heady=False): +def make_cloud(path, path_out="cloud_out", sep=",", position=0, exl=False, heady=False, + random_state=21, output=False,): """ Featurize SMILES strings, perform dimensionality reduction using UMAP and save the resulting chemical point cloud. Mordred descriptors that result in errors @@ -50,52 +51,64 @@ def make_cloud(path, path_out='cloud_out', sep=',', position=0, exl=False, heady position : int, default=0 The index of the column containing the SMILES strings. Default is 0. - exl : bool, default=False + exl : bool, default=False Flag indicating whether the input file is an Excel sheet. heady : bool, default=False Indicates if there is a header in the input SMILES file. + random_state : int, default=21 + Seed used as random_state for UMAP. + + output : bool, default=False + Indicates if the function return the point clouds (besides saving to disk). + Returns ------- cloud : numpy.ndarray of shape (N, 3) - Coordinates for the resulting chemical point cloud (UMAP projection). + Coordinates for the resulting chemical point cloud (UMAP projection). """ - print('Loading Scikit-learn, RDKit, and Mordred...') + print("Loading Scikit-learn, RDKit, and Mordred...") from mordred import Calculator, descriptors from rdkit import Chem import sklearn - print('Finished loading dependencies, featurizing SMILES...') + + print("Finished loading dependencies, featurizing SMILES...") scaler = sklearn.preprocessing.StandardScaler() calc = Calculator(descriptors, ignore_3D=True) - print('Loading SMILES...') + print("Loading SMILES...") if heady: head = 0 else: head = None if exl: - smiles = pd.read_excel(path,header=head) + smiles = pd.read_excel(path, header=head) else: - smiles = pd.read_csv(path,sep=sep,header=head,engine='python') - smiles = smiles.to_numpy()[:,position] - print('Converting to Mols...') + smiles = pd.read_csv(path, sep=sep, header=head, engine="python") + smiles = smiles.to_numpy()[:, position] + print("Converting to Mols...") mols = [Chem.MolFromSmiles(smi) for smi in smiles] - print('Calculating 2D descriptors from Mols...') + print("Calculating 2D descriptors from Mols...") df = calc.pandas(mols) df = drop_non_numeric_columns(df) - new_column = pd.Series(smiles, name='SMILES') + new_column = pd.Series(smiles, name="SMILES") s_vals = scaler.fit_transform(df.values) print("Finished 2D descriptor calculations.") print("Loading UMAP and embedding chemical point cloud...") from umap import UMAP - reducer = UMAP(n_components=3, min_dist=.1, spread=.5, n_neighbors=15) + + reducer = UMAP(n_components=3, min_dist=0.1, spread=0.5, n_neighbors=15, + random_state=random_state) cloud = reducer.fit_transform(s_vals) - np.save(path_out,cloud) - df.insert(0, 'SMILES', new_column) - df.to_csv(f'{path_out}.csv',index=False) + np.save(path_out, cloud) + df.insert(0, "SMILES", new_column) + df.to_csv(f"{path_out}.csv", index=False) print(f"Done! Saved chemical point cloud at '{path_out}.npy'.") print(f"Saved 2D descriptors at '{path_out}.csv'.") + return cloud if output else None + + def see_cloud(f_out, points, save=False): """ Visualize the chemical point cloud in a 3D scatter plot. @@ -103,27 +116,37 @@ def see_cloud(f_out, points, save=False): Parameters ---------- f_out (str): The output file name or path (excluding file extension) for the plot. - + points : numpy.ndarray of shape (N, 3) The array of points representing the chemical point cloud. - + save : bool, default=False Flag indicating whether to save the plot. Default is False. """ import matplotlib.pyplot as plt + fig = plt.figure() - ax = fig.add_subplot(111, projection='3d') - ax.scatter(points[:, 0], points[:, 1], points[:, 2],s=.1,c='b') - ax.set_xlabel('UMAP_1') + ax = fig.add_subplot(111, projection="3d") + ax.scatter(points[:, 0], points[:, 1], points[:, 2], s=0.1, c="b") + ax.set_xlabel("UMAP_1") ax.set_ylabel("UMAP_2") ax.set_zlabel("UMAP_3") plt.tight_layout() if save: - plt.savefig(f'{f_out}.svg') + plt.savefig(f"{f_out}.svg") plt.show() -def save_cloud(smiles_loc, f_out, points='chem_cloud.npy', sep=',', - position=0, indx=None, exl=False, heady=False): + +def save_cloud( + smiles_loc, + f_out, + points="chem_cloud.npy", + sep=",", + position=0, + indx=None, + exl=False, + heady=False, +): """ Save the chemical point cloud with corresponding SMILES strings to a human readable file. @@ -141,10 +164,10 @@ def save_cloud(smiles_loc, f_out, points='chem_cloud.npy', sep=',', sep : str, default=',' The delimitor to be used when parsing the file with SMILES. - + position : int, default=0 The index location to be used when reading SMILES. - + indx : str or None, default=None The .npy file path/name of the downsampling indexs. If the '-d' or '--down' flags were not used for the comand line, or @@ -153,7 +176,7 @@ def save_cloud(smiles_loc, f_out, points='chem_cloud.npy', sep=',', exl : bool, default=False Flag indicating wether the input file is an Excel sheet. If set to True, the resulting output file will aslo be an Excel sheet. - + heady : bool, default=False Flag to indicate if the SMILES file contains a header line or not. @@ -169,25 +192,30 @@ def save_cloud(smiles_loc, f_out, points='chem_cloud.npy', sep=',', head = None if exl: try: - smiles = pd.read_excel(smiles_loc,header=head) + smiles = pd.read_excel(smiles_loc, header=head) except Exception as error: - raise ValueError(f"Issue loading '{smiles_loc}'. \ + raise ValueError( + f"Issue loading '{smiles_loc}'. \ If using the excel flag, make sure\ - you are also loading excel sheets.") from error + you are also loading excel sheets." + ) from error else: - smiles = pd.read_csv(smiles_loc,sep=sep,header=head,engine='python') - smiles = smiles.to_numpy()[:,position] + smiles = pd.read_csv(smiles_loc, sep=sep, header=head, engine="python") + smiles = smiles.to_numpy()[:, position] points = np.load(points) if indx is not None: smiles = smiles[indx] points = points[indx] - sheet = pd.DataFrame({'SMILES': smiles, - 'UMAP_1': points[:,0], - 'UMAP_2': points[:,1], - 'UMAP_3': points[:,2] - }) + sheet = pd.DataFrame( + { + "SMILES": smiles, + "UMAP_1": points[:, 0], + "UMAP_2": points[:, 1], + "UMAP_3": points[:, 2], + } + ) if exl: - sheet.to_excel(f_out,index=False) + sheet.to_excel(f_out, index=False) else: - sheet.to_csv(f_out,index=False) - print(f'Completed with no errors, wrote results to {f_out}') + sheet.to_csv(f_out, index=False) + print(f"Completed with no errors, wrote results to {f_out}")