From c6e0cd33fc635ba95d81675ada731fd9bb22c81d Mon Sep 17 00:00:00 2001 From: FJDorfner Date: Mon, 10 Feb 2025 11:53:25 -0500 Subject: [PATCH] New Drift Calculators and Sampler for MMC Plus --- src/model_drift/drift/__init__.py | 6 +- src/model_drift/drift/categorical.py | 126 +++++++++++++++++++++++++- src/model_drift/drift/numeric.py | 128 ++++++++++++++++++++++++++- src/model_drift/drift/performance.py | 53 ++++++++++- src/model_drift/drift/sampler.py | 20 +++++ 5 files changed, 325 insertions(+), 8 deletions(-) diff --git a/src/model_drift/drift/__init__.py b/src/model_drift/drift/__init__.py index 9caded5..b1d2db9 100644 --- a/src/model_drift/drift/__init__.py +++ b/src/model_drift/drift/__init__.py @@ -3,10 +3,10 @@ # Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. # ------------------------------------------------------------------------------------------ from .base import BaseDriftCalculator # noqa -from .categorical import ChiSqDriftCalculator # noqa +from .categorical import ChiSqDriftCalculator, ChiSqDriftCalculatorJackKnife, HellingerDriftCalculatorJackKnife # noqa from .histogram import HistIntersectionCalculator, KdeHistPlotCalculator # noqa from .collection import DriftCollectionCalculator # noqa -from .numeric import KSDriftCalculator, BasicDriftCalculator # noqa +from .numeric import KSDriftCalculator, BasicDriftCalculator, KSDriftCalculatorJackKnife, EMDDriftCalculatorJackKnife_1D, EMDDriftCalculatorJackKnife_woRef # noqa from .performance import AUROCCalculator, ClassificationReportCalculator # noqa from .sampler import Sampler # noqa -from .tabular import TabularDriftCalculator # noqa +from .tabular import TabularDriftCalculator # noqa \ No newline at end of file diff --git a/src/model_drift/drift/categorical.py b/src/model_drift/drift/categorical.py index 37beb98..f80010a 100644 --- a/src/model_drift/drift/categorical.py +++ b/src/model_drift/drift/categorical.py @@ -3,13 +3,13 @@ # Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. # ------------------------------------------------------------------------------------------ from collections import Counter +import math import numpy as np from scipy.stats import chi2_contingency, chi2 from model_drift.drift.base import BaseDriftCalculator - def merge_freqs(ref_counts, sample): sample_counts = Counter(sample) keys = set().union(ref_counts, sample_counts) @@ -17,6 +17,11 @@ def merge_freqs(ref_counts, sample): obs = np.array([sample_counts.get(k, 0) for k in keys]) return exp, keys, obs +def hellinger_distance(p, q): + """Hellinger distance between two discrete distributions. + """ + return math.sqrt(sum([ (math.sqrt(p_i) - math.sqrt(q_i))**2 for p_i, q_i in zip(p, q) ]) / 2) + class ChiSqDriftCalculator(BaseDriftCalculator): name = "chi2" @@ -53,3 +58,122 @@ def _predict(self, sample): out['critical_diff'] = out['distance'] - out['critical_value'] return out + + + +class ChiSqDriftCalculatorJackKnife(BaseDriftCalculator): + name = "chi2_jackknife" + + def __init__(self, q_val=0.1, correction=True, lambda_=None, use_freq=False, include_critical_values=False, + **kwargs): + super().__init__(**kwargs) + self.q_val = q_val + self.correction = correction + self.lambda_ = lambda_ + self.use_freq = use_freq + self.include_critical_values = include_critical_values + + def convert(self, arg): + return arg.apply(str) + + def prepare(self, ref, **kwargs): + self._ref_counts = Counter(ref) + super().prepare(ref) + + def _predict(self, sample): + + nref = len(self._ref) + nobs = len(sample) + + ref1 = np.random.choice(self._ref, nobs) + ref2 = np.random.choice(self._ref, nobs) + + ref1_counts = Counter(ref1) + ref2_counts = Counter(ref2) + + exp_ref1_sam, keys, obs_ref1_sam = merge_freqs(ref1_counts, sample) + + exp_ref1_ref2, keys, obs_ref1_ref2 = merge_freqs(ref1_counts, ref2_counts) + + if self.use_freq: + exp_ref1_sam = exp_ref1_sam / exp_ref1_sam.sum() + obs_ref1_sam = obs_ref1_sam / obs_ref1_sam.sum() + + exp_ref1_ref2 = exp_ref1_ref2 / exp_ref1_ref2.sum() + obs_ref1_ref2 = obs_ref1_ref2 / obs_ref1_ref2.sum() + + + out = {} + + dist1, _, _, _ = chi2_contingency(np.vstack([exp_ref1_sam, obs_ref1_sam]), + correction=self.correction, + lambda_=self.lambda_) + + dist2, _, _, _ = chi2_contingency(np.vstack([exp_ref1_ref2, obs_ref1_ref2]), + correction=self.correction, + lambda_=self.lambda_) + + out["distance"] = max(dist1 - dist2, 0.0) + out['pval'] = float("NaN") + + + if self.include_critical_values: + raise NotImplementedError("Critical value not implemented for jackknife") + + return out + +class HellingerDriftCalculatorJackKnife(BaseDriftCalculator): + name = "hellinger_jackknife" + + def __init__(self, q_val=0.1, correction=True, lambda_=None, use_freq=True, include_critical_values=False, + **kwargs): + super().__init__(**kwargs) + self.q_val = q_val + self.correction = correction + self.lambda_ = lambda_ + self.use_freq = use_freq + self.include_critical_values = include_critical_values + + def convert(self, arg): + return arg.apply(str) + + def prepare(self, ref, **kwargs): + self._ref_counts = Counter(ref) + super().prepare(ref) + + def _predict(self, sample): + + nref = len(self._ref) + nobs = len(sample) + + ref1 = np.random.choice(self._ref, nobs) + ref2 = np.random.choice(self._ref, nobs) + + ref1_counts = Counter(ref1) + ref2_counts = Counter(ref2) + + exp_ref1_sam, keys, obs_ref1_sam = merge_freqs(ref1_counts, sample) + + exp_ref1_ref2, keys, obs_ref1_ref2 = merge_freqs(ref1_counts, ref2_counts) + + if self.use_freq: + exp_ref1_sam = exp_ref1_sam / exp_ref1_sam.sum() + obs_ref1_sam = obs_ref1_sam / obs_ref1_sam.sum() + + exp_ref1_ref2 = exp_ref1_ref2 / exp_ref1_ref2.sum() + obs_ref1_ref2 = obs_ref1_ref2 / obs_ref1_ref2.sum() + + out = {} + + dist1 = hellinger_distance(exp_ref1_sam, obs_ref1_sam) + dist2 = hellinger_distance(exp_ref1_ref2, obs_ref1_ref2) + + + out["distance"] = max(dist1 - dist2, 0.0) + out['pval'] = float("NaN") + + + if self.include_critical_values: + raise NotImplementedError("Critical value not implemented for jackknife") + + return out \ No newline at end of file diff --git a/src/model_drift/drift/numeric.py b/src/model_drift/drift/numeric.py index c10d871..53bf5c1 100644 --- a/src/model_drift/drift/numeric.py +++ b/src/model_drift/drift/numeric.py @@ -5,7 +5,8 @@ import numpy as np import pandas as pd from scipy.special import kolmogi -from scipy.stats import ks_2samp +from scipy.stats import ks_2samp, wasserstein_distance +import ot from model_drift.drift.base import BaseDriftCalculator @@ -48,6 +49,131 @@ def calc_critical_value(n1, n2, q=.01): return kolmogi(q) * np.sqrt((n1 + n2) / (n1 * n2)) +class KSDriftCalculatorJackKnife(NumericBaseDriftCalculator): + name = "ks_jackknife" + + def __init__(self, q_val=0.1, alternative='two-sided', mode='asymp', average='macro', include_critical_value=False, + **kwargs): + super().__init__(**kwargs) + self.q_val = q_val + self.alternative = alternative + self.mode = mode + self.average = average + self.include_critical_value = include_critical_value + + def _predict(self, sample): + nref = len(self._ref) + nobs = len(sample) + + ref1 = np.random.choice(self._ref, nobs) + ref2 = np.random.choice(self._ref, nobs) + out = {} + try: + dist1, _ = ks_2samp(ref1, sample, alternative=self.alternative, mode=self.mode) + dist2, _ = ks_2samp(ref1, ref2, alternative=self.alternative, mode=self.mode) + + out["distance"] = max(dist1 - dist2, 0.0) + out['pval'] = float("NaN") + + except TypeError: + out["distance"], out['pval'] = float("NaN"), float("NaN") + + if self.include_critical_value: + raise NotImplementedError("Critical value not implemented for jackknife") + + return out + + +class EMDDriftCalculatorJackKnife_woRef(NumericBaseDriftCalculator): + name = "emd_jackknife" + + def __init__(self, include_critical_value=False, **kwargs): + super().__init__(**kwargs) + self.include_critical_value = include_critical_value + + def convert(self, arg): + return arg + + def _predict(self, sample): + + def _emd_distance(tar, ref): + a = np.ones((len(ref))) / len(ref) # Uniform weights for the reference set + b = np.ones((len(tar))) / len(tar) # Uniform weights for the target set + M = ot.dist(ref, tar) + G0 = ot.emd(a, b, M, numItermax=1000000) + em_distance = np.sum(M * G0) + return em_distance + + nref = len(self._ref) + nobs = len(sample) + + sample_tuples = sample.apply(tuple) + ref_tuples = self._ref.apply(tuple) + ref_tuples_exclusive = ref_tuples[~ref_tuples.isin(sample_tuples)] + ref_lists_exclusive = ref_tuples_exclusive.apply(list) + + ref1 = np.random.choice(ref_lists_exclusive, nobs) + ref2 = np.random.choice(ref_lists_exclusive, nobs) + + sample_arr = np.array(sample.tolist()) + sample_arr = np.nan_to_num(sample_arr, nan=0.0, posinf=0.0, neginf=0.0) + + ref1_arr = np.array(ref1.tolist()) + ref1_arr = np.nan_to_num(ref1_arr, nan=0.0, posinf=0.0, neginf=0.0) + + ref2_arr = np.array(ref2.tolist()) + ref2_arr = np.nan_to_num(ref2_arr, nan=0.0, posinf=0.0, neginf=0.0) + + out = {} + try: + dist1 = _emd_distance(ref1_arr, sample_arr) + dist2 = _emd_distance(ref1_arr, ref2_arr) + + out["distance"] = max(dist1 - dist2, 0.0) + out['pval'] = float("NaN") + + except TypeError: + out["distance"], out['pval'] = float("NaN"), float("NaN") + + if self.include_critical_value: + raise NotImplementedError("Critical value not implemented for jackknife") + + return out + +class EMDDriftCalculatorJackKnife_1D(NumericBaseDriftCalculator): + name = "emd_jackknife_1d" + + def __init__(self, include_critical_value=False, **kwargs): + super().__init__(**kwargs) + self.include_critical_value = include_critical_value + + def _predict(self, sample): + nref = len(self._ref) + nobs = len(sample) + + ref1 = np.random.choice(self._ref, nobs) + ref2 = np.random.choice(self._ref, nobs) + out = {} + # drop NaNs from the arrays before computing the EMD + ref1 = ref1[~np.isnan(ref1)] + ref2 = ref2[~np.isnan(ref2)] + sample = sample[~np.isnan(sample)] + try: + dist1 = wasserstein_distance(ref1, sample) + dist2 = wasserstein_distance(ref1, ref2) + + out["distance"] = max(dist1 - dist2, 0.0) + out['pval'] = float("NaN") + + except TypeError: + out["distance"], out['pval'] = float("NaN"), float("NaN") + + if self.include_critical_value: + raise NotImplementedError("Critical value not implemented for jackknife") + + return out + + class BasicDriftCalculator(NumericBaseDriftCalculator): name = "stats" diff --git a/src/model_drift/drift/performance.py b/src/model_drift/drift/performance.py index 0c10668..46a42a2 100644 --- a/src/model_drift/drift/performance.py +++ b/src/model_drift/drift/performance.py @@ -47,14 +47,42 @@ def macro_auc(scores, labels, skip_missing=True): def micro_auc(scores, labels): return float(auroc(torch.tensor(scores), torch.tensor(labels).long(), average='micro').numpy()) +def youden_point(df, target_names): + """ + Calculate the Youden Index for each disease and return that as optimal operating point for the disease. + """ + operating_points = {} + for disease in target_names: + fpr, tpr, thresholds = metrics.roc_curve(df[f'label.{disease}'], df[f'activation.{disease}']) -def classification_report(scores, labels, target_names=None, th=0.5, ): + # Calculate Youden Index + idx = np.argmax(tpr - fpr) + best_cutoff = thresholds[idx] + + operating_points[disease] = format(best_cutoff, ".4f") + + return operating_points + + +def classification_report(scores, labels, target_names=None, th=None): keeps = (labels.sum(axis=0) > 0) if target_names is None: target_names = [str(i) for i in range(scores.shape[1])] target_names = np.array(target_names) - output = metrics.classification_report(labels, scores >= th, target_names=target_names, output_dict=True) + + if not isinstance(th, dict): + raise ValueError("Thresholds (th) must be provided as a dictionary with target names as keys.") + + #binarize scores according to their thresholds + binary_scores = np.zeros_like(scores, dtype=bool) + for i, target in enumerate(target_names): + if target in th: + binary_scores[:, i] = scores[:, i] >= float(th[target]) + else: + raise KeyError(f"No threshold provided for target '{target}'.") + + output = metrics.classification_report(labels, binary_scores, target_names=target_names, output_dict=True) for i, k in enumerate(target_names): if keeps[i] == 0: continue @@ -99,12 +127,31 @@ def _predict(self, sample): class ClassificationReportCalculator(BaseDriftCalculator): name = "class_report" - def __init__(self, label_col=None, score_col=None, target_names=None, th=0.5): + def __init__(self, label_col=None, score_col=None, target_names=None, th=None): super().__init__() self.label_col = label_col self.score_col = score_col self.target_names = target_names self.th = th + + def prepare(self, ref): + self._ref = self.convert(ref) + if 'activation' in self._ref and 'label' in self._ref: + + activations_df = pd.DataFrame(self._ref['activation'].tolist(), index=self._ref.index) + labels_df = pd.DataFrame(self._ref['label'].tolist(), index=self._ref.index) + combined_df = pd.concat([activations_df, labels_df], axis=1) + + combined_df.columns = ['activation.' + str(k) for i, k in enumerate(self.target_names)] + \ + ['label.' + str(k) for i, k in enumerate(self.target_names)] + + self._ref_df = combined_df + self._is_prepared = True + else: + print("Error: 'ref' does not contain required 'activation' and 'label' columns.") + + self.th = youden_point(self._ref_df, self.target_names) + self._is_prepared = True def convert(self, arg): if not isinstance(arg, pd.DataFrame): diff --git a/src/model_drift/drift/sampler.py b/src/model_drift/drift/sampler.py index e8bf7e6..1682002 100644 --- a/src/model_drift/drift/sampler.py +++ b/src/model_drift/drift/sampler.py @@ -24,3 +24,23 @@ def sample(self, sample, stratify=None): def sample_iterator(self, sample, n_samples=1, stratify=None): for _ in range(n_samples): yield self.sample(sample, stratify=stratify) + +class DummySampler(object): + """ + This is a dummy sampler that mimics the behavior of the Sampler class, but does + not perform any samples and instead returns the indicies unchanged. This is used + for the JackKnife resampling, in which the reference dataframe is resampled in the + metric itself and the sliding window sample is not resampled at all. In this case + we still want to create multiple copies of the sample window. + """ + def __init__(self, sample_size=0, replacement=True, random_state=None): + # These variables are all dummy variables, just kept for future + # compatibility. + self.sample_size = sample_size + self.replacement = replacement + self.random_state = random_state + + def sample_iterator(self, indices, n_samples=1, stratify=None): + + for _ in range(n_samples): + yield indices