From 426a8724dc95029b1a613cd26fa8521c13f682f8 Mon Sep 17 00:00:00 2001 From: kohlhaasrebecca <rebecca.kohlhaas@outlook.com> Date: Wed, 13 Nov 2024 16:14:19 +0100 Subject: [PATCH] Create PCE.calculate_sobol() --- .../example_analytical_function.py | 2 + .../surrogate_models/polynomial_chaos.py | 413 ++++++++++++++++++ 2 files changed, 415 insertions(+) diff --git a/examples/analytical-function/example_analytical_function.py b/examples/analytical-function/example_analytical_function.py index 76eb16b7e..481fcfcab 100644 --- a/examples/analytical-function/example_analytical_function.py +++ b/examples/analytical-function/example_analytical_function.py @@ -220,6 +220,8 @@ if __name__ == "__main__": #engine.train_sequential() engine.train_normal() + engine.MetaModel.calculate_sobol() + # Load the objects # with open(f"PCEModel_{Model.name}.pkl", "rb") as input: # PCEModel = joblib.load(input) diff --git a/src/bayesvalidrox/surrogate_models/polynomial_chaos.py b/src/bayesvalidrox/surrogate_models/polynomial_chaos.py index 25e5ecf83..592652e2a 100644 --- a/src/bayesvalidrox/surrogate_models/polynomial_chaos.py +++ b/src/bayesvalidrox/surrogate_models/polynomial_chaos.py @@ -9,10 +9,12 @@ import os import warnings import functools +import math import matplotlib.pyplot as plt import numpy as np import sklearn.linear_model as lm from joblib import Parallel, delayed +from itertools import combinations from tqdm import tqdm from .apoly_construction import apoly_construction @@ -927,3 +929,414 @@ class PCE(MetaModel): print('-'*40) return pce_means, pce_stds + + def calculate_sobol(self): + """ + Calculates Sobol' indices of all possible orders and the Total Sobol' indices + based on the coefficients in the PCE. + + Parameters + ---------- + None. + + Returns + ------- + sobol + totalsobol + + """ + # Extract the necessary variables + max_order = np.max(self._pce_deg) # TODO : simplify + sobol_cell_b = {} + total_sobol_b = {} + n_params = len(self.input_obj.Marginals) + cov_Z_p_q = np.zeros((n_params)) + outputs = self.out_names + + for b_i in range(self.n_bootstrap_itrs): + sobol_cell_, total_sobol_ = {}, {} + + for output in outputs: + n_meas_points = len(self._coeffs_dict[f"b_{b_i+1}"][output]) + + # Initialize the (cell) array containing the (total) Sobol indices. + sobol_array = dict.fromkeys(range(1, max_order + 1), []) + sobol_cell_array = dict.fromkeys(range(1, max_order + 1), []) + + for i_order in range(1, max_order + 1): + n_comb = math.comb(n_params, i_order) + + sobol_cell_array[i_order] = np.zeros((n_comb, n_meas_points)) + + total_sobol_array = np.zeros((n_params, n_meas_points)) + + # Initialize the cell to store the names of the variables + TotalVariance = np.zeros((n_meas_points)) + # Loop over all measurement points and calculate sobol indices + for pIdx in range(n_meas_points): + # Extract the basis indices (alpha) and coefficients + Basis = self._basis_dict[f"b_{b_i+1}"][output][f"y_{pIdx+1}"] + + try: + clf_poly = self.clf_poly[f"b_{b_i+1}"][output][ + f"y_{pIdx+1}" + ] + PCECoeffs = clf_poly.coef_ + except: + PCECoeffs = self._coeffs_dict[f"b_{b_i+1}"][output][f"y_{pIdx+1}"] + + # Compute total variance + TotalVariance[pIdx] = np.sum(np.square(PCECoeffs[1:])) + + nzidx = np.where(PCECoeffs != 0)[0] + # Set all the Sobol indices equal to zero in the presence of a + # null output. + if len(nzidx) == 0: + # This is buggy. + for i_order in range(1, max_order + 1): + sobol_cell_array[i_order][:, pIdx] = 0 + + # Otherwise compute them by summing well-chosen coefficients + else: + nz_basis = Basis[nzidx] + for i_order in range(1, max_order + 1): + idx = np.where(np.sum(nz_basis > 0, axis=1) == i_order) + subbasis = nz_basis[idx] + Z = np.array(list(combinations(range(n_params), i_order))) + + for q in range(Z.shape[0]): + Zq = Z[q] + subsubbasis = subbasis[:, Zq] + subidx = np.prod(subsubbasis, axis=1) > 0 + sum_ind = nzidx[idx[0][subidx]] + if TotalVariance[pIdx] == 0.0: + sobol_cell_array[i_order][q, pIdx] = 0.0 + else: + sobol = np.sum(np.square(PCECoeffs[sum_ind])) + sobol /= TotalVariance[pIdx] + sobol_cell_array[i_order][q, pIdx] = sobol + + # Compute the TOTAL Sobol indices. + for ParIdx in range(n_params): + idx = nz_basis[:, ParIdx] > 0 + sum_ind = nzidx[idx] + + if TotalVariance[pIdx] == 0.0: + total_sobol_array[ParIdx, pIdx] = 0.0 + else: + sobol = np.sum(np.square(PCECoeffs[sum_ind])) + sobol /= TotalVariance[pIdx] + total_sobol_array[ParIdx, pIdx] = sobol + + # ----- if PCA selected: Compute covariance ----- + if self.dim_red_method.lower() == "pca": + # Extract the basis indices (alpha) and coefficients for + # next component + if pIdx < n_meas_points - 1: + nextBasis = self._basis_dict[f"b_{b_i+1}"][output][f"y_{pIdx+2}"] + if self.bootstrap_method != "fast" or b_i == 0: + clf_poly = self.clf_poly[f"b_{b_i+1}"][output][ + f"y_{pIdx+2}" + ] + nextPCECoeffs = clf_poly.coef_ + else: + nextPCECoeffs = self._coeffs_dict[f"b_{b_i+1}"][output][ + f"y_{pIdx+2}" + ] + + # Choose the common non-zero basis + mask = (Basis[:, None] == nextBasis).all(-1).any(-1) + n_mask = (nextBasis[:, None] == Basis).all(-1).any(-1) + + # Compute the covariance in Eq 17. + for ParIdx in range(n_params): + idx = (mask) & (Basis[:, ParIdx] > 0) + n_idx = (n_mask) & (nextBasis[:, ParIdx] > 0) + try: + cov_Z_p_q[ParIdx] += np.sum( + np.dot(PCECoeffs[idx], nextPCECoeffs[n_idx]) + ) + except: + pass + + # Compute the sobol indices according to Ref. 2 + if self.dim_red_method.lower() == "pca": + n_c_points = self.engine.ExpDesign.Y[output].shape[1] + PCA = self.pca[f"b_{b_i+1}"][output] + compPCA = PCA.components_ + nComp = compPCA.shape[0] + var_Z_p = PCA.explained_variance_ + + # Extract the sobol index of the components + for i_order in range(1, max_order + 1): + n_comb = math.comb(n_params, i_order) + sobol_array[i_order] = np.zeros((n_comb, n_c_points)) + Z = np.array(list(combinations(range(n_params), i_order))) + + # Loop over parameters + for q in range(Z.shape[0]): + S_Z_i = sobol_cell_array[i_order][q] + + for tIdx in range(n_c_points): + var_Y_t = np.var( + self.engine.ExpDesign.Y[output][:, tIdx] + ) + if var_Y_t == 0.0: + term1, term2 = 0.0, 0.0 + else: + # Eq. 17 + term1 = 0.0 + for i in range(nComp): + a = S_Z_i[i] * var_Z_p[i] + a *= compPCA[i, tIdx] ** 2 + term1 += a + + # TODO: Term 2 + # term2 = 0.0 + # for i in range(nComp-1): + # term2 += cov_Z_p_q[q] * compPCA[i, tIdx] + # term2 *= compPCA[i+1, tIdx] + # term2 *= 2 + + sobol_array[i_order][q, tIdx] = term1 # + term2 + + # Devide over total output variance Eq. 18 + sobol_array[i_order][q, tIdx] /= var_Y_t + + # Compute the TOTAL Sobol indices. + total_sobol = np.zeros((n_params, n_c_points)) + for ParIdx in range(n_params): + S_Z_i = total_sobol_array[ParIdx] + + for tIdx in range(n_c_points): + var_Y_t = np.var(self.engine.ExpDesign.Y[output][:, tIdx]) + if var_Y_t == 0.0: + term1, term2 = 0.0, 0.0 + else: + term1 = 0 + for i in range(nComp): + term1 += ( + S_Z_i[i] * var_Z_p[i] * (compPCA[i, tIdx] ** 2) + ) + + # Term 2 + term2 = 0 + for i in range(nComp - 1): + term2 += ( + cov_Z_p_q[ParIdx] + * compPCA[i, tIdx] + * compPCA[i + 1, tIdx] + ) + term2 *= 2 + + total_sobol[ParIdx, tIdx] = term1 # + term2 + + # Devide over total output variance Eq. 18 + total_sobol[ParIdx, tIdx] /= var_Y_t + + sobol_cell_[output] = sobol_array + total_sobol_[output] = total_sobol + else: + sobol_cell_[output] = sobol_cell_array + total_sobol_[output] = total_sobol_array + + # Save for each bootsrtap iteration + sobol_cell_b[b_i] = sobol_cell_ + total_sobol_b[b_i] = total_sobol_ + + # Average total sobol indices + total_sobol_all = {} + for i in sorted(total_sobol_b): + for k, v in total_sobol_b[i].items(): + if k not in total_sobol_all: + total_sobol_all[k] = [None] * len(total_sobol_b) + total_sobol_all[k][i] = v + sobol_all = {} + for i in sorted(sobol_cell_b): + for k, v in sobol_cell_b[i].items(): + for l, m in v.items(): + if l not in sobol_all: + sobol_all[l] = {} + if k not in sobol_all[l]: + sobol_all[l][k] = [None] * len(sobol_cell_b) + sobol_all[l][k][i] = v[l] + + self.sobol = {} + # Will receive a set of indices for each possible degree of polynomial/interaction + for i_order in range(1, max_order + 1): + self.sobol[i_order] = {} + for output in outputs: + self.sobol[i_order][output] = np.mean( + [sobol_all[i_order][output]], axis=0 + ) + + self.total_sobol = {} + for output in outputs: + self.total_sobol[output] = np.mean(total_sobol_all[output], axis=0) + + return self.sobol, self.total_sobol + + def plot_sobol(self): + + # ---------------- Plot ----------------------- + par_names = self.engine.ExpDesign.par_names + x_values_orig = self.engine.ExpDesign.x_values + + # Check if the x_values match the number of metamodel outputs + if ( + not np.array(x_values_orig).shape[0] + == self.total_sobol[outputs[0]].shape[1] + ): + print( + "The number of MetaModel outputs does not match the x_values" + " specified in ExpDesign. Images are created with " + "equidistant numbers on the x-axis" + ) + x_values_orig = np.arange(0, 1, self.total_sobol[output].shape[0]) + + if 1: + fig = plt.figure() + + for i_order in range(1, max_order + 1): + # Change labels to combined params sets for higher order indices + if i_order == 1: + par_names_i = par_names + else: + par_names_i = list(combinations(par_names, i_order)) + for outIdx, output in enumerate(outputs): + # Extract total Sobol indices + sobol = self.sobol[i_order][output][0] + + # Compute quantiles + q_5 = np.quantile(sobol_all[i_order][output], q=0.05, axis=0) + q_97_5 = np.quantile(sobol_all[i_order][output], q=0.975, axis=0) + + # Extract a list of x values + if type(x_values_orig) is dict: + x = x_values_orig[output] + else: + x = x_values_orig + + if plot_type == "bar": + ax = fig.add_axes([0, 0, 1, 1]) + dict1 = {xlabel: x} + dict2 = { + param: sobolIndices + for param, sobolIndices in zip(par_names_i, sobol) + } + + df = pd.DataFrame({**dict1, **dict2}) + df.plot( + x=xlabel, + y=par_names_i, + kind="bar", + ax=ax, + rot=0, + colormap="Dark2", + yerr=q_97_5 - q_5, + ) + ax.set_ylabel("Sobol indices, $S^T$") + + else: + for i, sobolIndices in enumerate(sobol): + plt.plot( + x, + sobolIndices, + label=par_names_i[i], + marker="x", + lw=2.5, + ) + plt.fill_between(x, q_5[i], q_97_5[i], alpha=0.15) + + plt.ylabel("Sobol indices, $S^T$") + plt.xlabel(xlabel) + + plt.title(f"{i_order} degree Sensitivity analysis of {output}") + if plot_type != "bar": + plt.legend(loc="best", frameon=True) + + # Save indices + np.savetxt( + f"{self.out_dir}sobol_{i_order}_" + + output.replace("/", "_") + + ".csv", + sobol.T, + delimiter=",", + header=",".join(par_names), + comments="", + ) + + # save the current figure + # print(f'{self.out_dir}/{newpath}Sobol_indices_{i_order}_{output}.pdf') + fig.savefig( + f"{self.out_dir}Sobol_indices_{i_order}_{output}.pdf", + bbox_inches="tight", + ) + + # Destroy the current plot + plt.clf() + + fig = plt.figure() + + for outIdx, output in enumerate(outputs): + # Extract total Sobol indices + total_sobol = self.total_sobol[output] + + # Compute quantiles + q_5 = np.quantile(total_sobol_all[output], q=0.05, axis=0) + q_97_5 = np.quantile(total_sobol_all[output], q=0.975, axis=0) + + # Extract a list of x values + if type(x_values_orig) is dict: + x = x_values_orig[output] + else: + x = x_values_orig + + if plot_type == "bar": + ax = fig.add_axes([0, 0, 1, 1]) + dict1 = {xlabel: x} + dict2 = { + param: sobolIndices + for param, sobolIndices in zip(par_names, total_sobol) + } + + df = pd.DataFrame({**dict1, **dict2}) + df.plot( + x=xlabel, + y=par_names, + kind="bar", + ax=ax, + rot=0, + colormap="Dark2", + yerr=q_97_5 - q_5, + ) + ax.set_ylabel("Total Sobol indices, $S^T$") + + else: + for i, sobolIndices in enumerate(total_sobol): + plt.plot(x, sobolIndices, label=par_names[i], marker="x", lw=2.5) + plt.fill_between(x, q_5[i], q_97_5[i], alpha=0.15) + + plt.ylabel("Total Sobol indices, $S^T$") + plt.xlabel(xlabel) + + plt.title(f"Sensitivity analysis of {output}") + if plot_type != "bar": + plt.legend(loc="best", frameon=True) + + # Save indices + np.savetxt( + f"{self.out_dir}totalsobol_" + output.replace("/", "_") + ".csv", + total_sobol.T, + delimiter=",", + header=",".join(par_names), + comments="", + ) + + # save the current figure + fig.savefig( + f"{self.out_dir}TotalSobol_indices_{output}.pdf", bbox_inches="tight" + ) + + # Destroy the current plot + plt.clf() -- GitLab