Skip to content
234 changes: 159 additions & 75 deletions modules/performUQ/common/adaptive_doe.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,95 @@ def __init__(self, gp_model):
def _scale(self, x):
return self.gp_model.apply_input_scaling(x, fit=False)

def _safe_normalize(self, v: np.ndarray) -> np.ndarray:
s = float(np.nansum(v))
if not np.isfinite(s) or s <= 0:
return np.full_like(v, 1.0 / max(len(v), 1))
return v / s

def _compute_doe_weights(self) -> tuple[np.ndarray, dict]:
"""
Compute weights for aggregating across GP output components.

Return (weights, meta) for aggregating across GP output components:
- PCA on -> eigenvalue weights (explained_variance_ratio)
- PCA off -> per-output variance in processed training space.
"""
n_models = len(self.gp_model.model)

if self.gp_model.use_pca:
pinfo = self.gp_model.pca_info
ncomp = int(pinfo['n_components'])
evr = np.asarray(pinfo['explained_variance_ratio'])[:ncomp].astype(float)
w = self._safe_normalize(evr)
if len(w) != n_models: # defensive
w = np.full(n_models, 1.0 / max(n_models, 1))
basis = 'uniform_fallback'
basis_vals = np.ones(n_models, dtype=float)
else:
basis = 'pca_explained_variance_ratio'
basis_vals = evr
return w, {
'mode': 'pca',
'basis': basis,
'basis_values': basis_vals.tolist(),
'sum': float(np.sum(w)),
'count': int(len(w)),
}

# no PCA: weight by per-output variance in the space the GP trained on
Y = getattr(self.gp_model, 'y_train', None) # noqa: N806
if Y is None or Y.size == 0:
w = np.full(n_models, 1.0 / max(n_models, 1))
return w, {
'mode': 'no_pca',
'basis': 'uniform_no_training_data',
'basis_values': [1.0] * n_models,
'sum': float(np.sum(w)),
'count': int(len(w)),
}

if self.gp_model.output_scaler is not None:
Y_proc = self.gp_model.output_scaler.transform(Y) # noqa: N806
else:
Y_proc = Y # noqa: N806

raw = np.nanvar(Y_proc, axis=0, ddof=1).astype(float)
raw = np.where(np.isfinite(raw) & (raw > 0), raw, 1.0)
w = self._safe_normalize(raw)

if len(w) != n_models: # defensive
if len(w) > n_models:
basis_vals = raw[:n_models]
w = self._safe_normalize(basis_vals)
basis = 'output_variance_truncated'
else:
pad = np.full(
n_models - len(w), float(np.mean(raw)) if raw.size else 1.0
)
basis_vals = np.concatenate([raw, pad])
w = self._safe_normalize(basis_vals)
basis = 'output_variance_padded'
else:
basis = 'output_variance'
basis_vals = raw

return w, {
'mode': 'no_pca',
'basis': basis,
'basis_values': basis_vals.tolist(),
'sum': float(np.sum(w)),
'count': int(len(w)),
}

def _hyperparameters_for_doe(self):
"""
Compute the weighted average of kernel hyperparameters for DoE.

If PCA is used, weights are based on explained variance.
Otherwise, uniform averaging is used.
"""
if self.gp_model.use_pca:
pca_info = self.gp_model.pca_info
n_components = pca_info['n_components']
explained_variance_ratio = np.asarray(
pca_info['explained_variance_ratio']
)[:n_components]
if np.sum(explained_variance_ratio) == 0:
w = np.full(n_components, 1.0 / n_components)
else:
w = explained_variance_ratio / np.sum(explained_variance_ratio)
else:
n_models = len(self.gp_model.model)
w = np.full(n_models, 1.0 / n_models)
w, _ = self._compute_doe_weights()

hyperparameters_matrix = [
np.atleast_2d(model.kern.param_array) for model in self.gp_model.model
Expand Down Expand Up @@ -108,7 +177,7 @@ def select_training_points(
seed=None,
):
"""
Efficient sequential DoE using MSEw or IMSEw with vectorized acquisition.
Efficient sequential DoE using MSEw (equation 10) or IMSEw (equation 9) from Taflanidis et al. (https://doi.org/10.1016/j.ymssp.2025.113014).

Parameters
----------
Expand All @@ -119,9 +188,9 @@ def select_training_points(
mci_samples : array-like
Monte Carlo integration samples (original space).
use_mse_w : bool
Whether to use MSEw (True) or IMSEw (False).
Whether to use MSE equation (10) (True) or IMSEw equation (9) (False).
weights : array-like or None
Optional importance weights.
Optional importance weights for integration points.
n_samples : int
Number of candidate points to generate (IMSEw only).
seed : int or None
Expand All @@ -139,121 +208,136 @@ def select_training_points(

selected_points = []

# 1. Fixed candidate pool
# 1. Setup candidate pool based on acquisition function
if use_mse_w:
# MSE (equation 10): candidates are the integration points themselves
mci_samples, weights = remove_duplicate_inputs(
mci_samples,
weights
if weights is not None
else np.ones((mci_samples.shape[0], 1)),
)
candidate_pool = mci_samples.copy()
else:
# IMSEw (equation 9): generate separate candidate pool via LHS
bounds = compute_lhs_bounds(x_train, mci_samples, padding=0)
candidate_pool = generate_lhs_candidates(n_samples, bounds, seed=seed)

# 2. Sequential selection
for _ in range(n_points):
# 2. Scale everything
# Scale all arrays
scaled_x_train = self._scale(x_train)
scaled_candidates = self._scale(candidate_pool)
scaled_mci_samples = self._scale(mci_samples)

# 3. Set GP inputs
# Set GP with current training set
self.gp_model_for_doe.set_XY(
scaled_x_train, np.zeros((scaled_x_train.shape[0], 1))
)

# 4. Predict variance at integration points
_, pred_var = self.gp_model_for_doe.predict(scaled_mci_samples)
if weights is not None:
pred_var *= weights.reshape(-1, 1) # shape (n_mci, 1)

# 5. Vectorized acquisition
# Compute acquisition values based on selected method
if use_mse_w:
# Just return the average variance (same for all candidates)
acquisition_values = np.mean(pred_var) * np.ones(
(scaled_candidates.shape[0], 1)
)
# MSE (equation 10): σ²(θ_new|Θ^(k))
_, pred_var = self.gp_model_for_doe.predict(scaled_candidates)

# Apply importance weights if provided
if weights is not None:
pred_var *= weights.reshape(-1, 1)

acquisition_values = pred_var

else:
# Covariance matrix: shape (n_mci, n_candidates)
# IMSEw (equation 9): (1/N_s) * Σ[R(θ^(l), θ_new)^β * σ²(θ^(l)|Θ^(k))]

# Predict variance at integration points
_, pred_var = self.gp_model_for_doe.predict(scaled_mci_samples)

# Apply importance weights to integration points
if weights is not None:
pred_var *= weights.reshape(-1, 1)

# Compute covariance K(θ^(l), θ_new)
k_matrix = self.gp_model_for_doe.kern.K(
scaled_mci_samples, scaled_candidates
)

# Normalize to get correlation matrix
kernel_var = self.gp_model_for_doe.kern.variance[0] # scalar
corr_matrix = k_matrix / kernel_var # element-wise division
# Get diagonal kernel values for covariance normalization
k_diag_mci = np.diag(
self.gp_model_for_doe.kern.K(scaled_mci_samples)
)
k_diag_candidates = np.diag(
self.gp_model_for_doe.kern.K(scaled_candidates)
)

# Correlations : R(θ^(l), θ_new)
corr_matrix = k_matrix / np.sqrt(
np.outer(k_diag_mci, k_diag_candidates)
)

# Apply beta exponent
beta = 2.0 * scaled_x_train.shape[1]
# beta = 1.0 * scaled_x_train.shape[1]

# Vectorized IMSEw: (corr^beta) * pred_var → mean over MCI samples
weighted_var = (
corr_matrix**beta
).T @ pred_var # shape (n_candidates, 1)
# IMSEw computation: (corr^beta).T @ pred_var / N_s
weighted_var = (corr_matrix**beta).T @ pred_var
acquisition_values = weighted_var / scaled_mci_samples.shape[0]

# 6. Select best candidate
# Select candidate with maximum acquisition value
idx = np.argmax(acquisition_values)
next_point = candidate_pool[idx]
selected_points.append(next_point)

# 7. Update training set and candidate pool
# Update training set and candidate pool
x_train = np.vstack([x_train, next_point])
candidate_pool = np.delete(candidate_pool, idx, axis=0)

if use_mse_w:
# For MSE: maintain alignment by deleting from mci_samples too
mci_samples = np.delete(mci_samples, idx, axis=0)
if weights is not None:
weights = np.delete(weights, idx, axis=0)
# For IMSEw: keep mci_samples constant (integration domain unchanged)

return np.array(selected_points)

def write_gp_for_doe_to_json(self, filepath: str | Path):
"""
Write DoE GP kernel hyperparameters and contributing model param_arrays to JSON.

Parameters
----------
filepath : str or Path
Output file path.
"""
"""Write DoE GP kernel hyperparameters and contributing model param_arrays to JSON."""
if not hasattr(self, 'gp_model_for_doe'):
msg = 'gp_model_for_doe has not been initialized.'
raise RuntimeError(msg)

kernel = self.gp_model_for_doe.kern
# Ensure we have the current weighted vector available
if not hasattr(self, 'weighted_hyperparameters'):
_ = self._hyperparameters_for_doe()

# Reuse the exact same weights/provenance used for aggregation
weights, weights_meta = self._compute_doe_weights()

# Detailed hyperparameters for the DoE model
kernel = self.gp_model_for_doe.kern
doe_hyperparams = {
param.name: {
'value': param.values.tolist() # noqa: PD011
if param.size > 1
else float(param.values),
'shape': param.shape,
p.name: {
'value': p.values.tolist() if p.size > 1 else float(p.values), # noqa: PD011
'shape': p.shape,
}
for param in kernel.parameters
for p in kernel.parameters
}

# Aggregation weights
if self.gp_model.use_pca:
weights = np.asarray(self.gp_model.pca_info['explained_variance_ratio'])[
: self.gp_model.pca_info['n_components']
]
weights = (weights / np.sum(weights)).tolist()
else:
weights = (
np.ones(len(self.gp_model.model)) / len(self.gp_model.model)
).tolist()

# Contributing models' param_arrays only
contributing_param_arrays = [
gp.kern.param_array.tolist() for gp in self.gp_model.model
]

# Output structure
output = {
'doe_kernel_type': kernel.name,
'doe_ARD': kernel.ARD,
'doe_ARD': getattr(kernel, 'ARD', None),
'doe_hyperparameters': doe_hyperparams,
'weighted_param_array': self.weighted_hyperparameters.tolist(),
'aggregation_weights': weights,
'aggregation_weights': weights.tolist(),
'aggregation_weights_meta': weights_meta, # provenance for reproducibility
'contributing_param_arrays': contributing_param_arrays,
}

# Write JSON file
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with filepath.open('w') as f:
path = Path(filepath)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open('w') as f:
json.dump(uq_utilities.make_json_serializable(output), f, indent=4)


Expand Down
Loading
Loading