Module bayesvalidrox.post_processing
Expand source code
# -*- coding: utf-8 -*-
from .post_processing import PostProcessing
__all__ = [
"PostProcessing"
]
Sub-modules
bayesvalidrox.post_processing.adaptPlot
-
Created on Thu Aug 13 13:46:24 2020 …
bayesvalidrox.post_processing.post_processing
Classes
class PostProcessing (MetaModel, name='calib')
-
This class provides many helper functions to post-process the trained meta-model.
Attributes
MetaModel
:obj
- MetaModel object to do postprocessing on.
name
:str
- Type of the anaylsis. The default is
'calib'
. If a validation is expected to be performed change this to'valid'
.
Expand source code
class PostProcessing: """ This class provides many helper functions to post-process the trained meta-model. Attributes ---------- MetaModel : obj MetaModel object to do postprocessing on. name : str Type of the anaylsis. The default is `'calib'`. If a validation is expected to be performed change this to `'valid'`. """ def __init__(self, MetaModel, name='calib'): self.MetaModel = MetaModel self.name = name # ------------------------------------------------------------------------- def plot_moments(self, xlabel='Time [s]', plot_type=None, save_fig=True): """ Plots the moments in a pdf format in the directory `Outputs_PostProcessing`. Parameters ---------- xlabel : str, optional String to be displayed as x-label. The default is `'Time [s]'`. plot_type : str, optional Options: bar or line. The default is `None`. save_fig : bool, optional Save figure or not. The default is `True`. Returns ------- pce_means: dict Mean of the model outputs. pce_means: dict Standard deviation of the model outputs. """ bar_plot = True if plot_type == 'bar' else False meta_model_type = self.MetaModel.meta_model_type Model = self.MetaModel.ModelObj # Read Monte-Carlo reference self.mc_reference = Model.read_mc_reference() print(self.mc_reference) # Set the x values x_values_orig = self.MetaModel.ExpDesign.x_values # Compute the moments with the PCEModel object self._compute_pce_moments() # Get the variables out_names = Model.Output.names # Open a pdf for the plots if save_fig: newpath = (f'Outputs_PostProcessing_{self.name}/') if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}Mean_Std_PCE.pdf') # Plot the best fit line, set the linewidth (lw), color and # transparency (alpha) of the line for idx, key in enumerate(out_names): fig, ax = plt.subplots(nrows=1, ncols=2) # Extract mean and std mean_data = self.pce_means[key] std_data = self.pce_stds[key] # Extract a list of x values if type(x_values_orig) is dict: x = x_values_orig[key] else: x = x_values_orig # Plot: bar plot or line plot if bar_plot: ax[0].bar(list(map(str, x)), mean_data, color='b', width=0.25) ax[1].bar(list(map(str, x)), std_data, color='b', width=0.25) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: ax[0].plot(x, mean_data, lw=3, color='k', marker='x', label=meta_model_type) ax[1].plot(x, std_data, lw=3, color='k', marker='x', label=meta_model_type) if self.mc_reference is not None: if bar_plot: ax[0].bar(list(map(str, x)), self.mc_reference['mean'], color='r', width=0.25) ax[1].bar(list(map(str, x)), self.mc_reference['std'], color='r', width=0.25) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: ax[0].plot(x, self.mc_reference['mean'], lw=3, marker='x', color='r', label='Ref.') ax[1].plot(x, self.mc_reference['std'], lw=3, marker='x', color='r', label='Ref.') # Label the axes and provide a title ax[0].set_xlabel(xlabel) ax[1].set_xlabel(xlabel) ax[0].set_ylabel(key) ax[1].set_ylabel(key) # Provide a title ax[0].set_title('Mean of ' + key) ax[1].set_title('Std of ' + key) if not bar_plot: ax[0].legend(loc='best') ax[1].legend(loc='best') plt.tight_layout() if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() return self.pce_means, self.pce_stds # ------------------------------------------------------------------------- def valid_metamodel(self, n_samples=1, samples=None, x_axis='Time [s]'): """ Evaluates and plots the meta model and the PCEModel outputs for the given number of samples or the given samples. Parameters ---------- n_samples : int, optional Number of samples to be evaluated. The default is 1. samples : array of shape (n_samples, n_params), optional Samples to be evaluated. The default is None. x_axis : str, optional Label of x axis. The default is `'Time [s]'`. Returns ------- None. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj if samples is None: self.n_samples = n_samples samples = self._get_sample() else: self.n_samples = samples.shape[0] # Extract x_values x_values = MetaModel.ExpDesign.x_values self.model_out_dict = self._eval_model(samples, key_str='valid') self.pce_out_mean, self.pce_out_std = MetaModel.eval_metamodel(samples) try: key = Model.Output.names[1] except IndexError: key = Model.Output.names[0] n_obs = self.model_out_dict[key].shape[1] if n_obs == 1: self._plot_validation() else: self._plot_validation_multi(x_values=x_values, x_axis=x_axis) # ------------------------------------------------------------------------- def check_accuracy(self, n_samples=None, samples=None, outputs=None): """ Checks accuracy of the metamodel by computing the root mean square error and validation error for all outputs. Parameters ---------- n_samples : int, optional Number of samples. The default is None. samples : array of shape (n_samples, n_params), optional Parameter sets to be checked. The default is None. outputs : dict, optional Output dictionary with model outputs for all given output types in `Model.Output.names`. The default is None. Raises ------ Exception When neither n_samples nor samples are provided. Returns ------- rmse: dict Root mean squared error for each output. valid_error : dict Validation error for each output. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj # Set the number of samples if n_samples: self.n_samples = n_samples elif samples is not None: self.n_samples = samples.shape[0] else: raise Exception("Please provide either samples or pass number of " "samples!") # Generate random samples if necessary Samples = self._get_sample() if samples is None else samples # Run the original model with the generated samples if outputs is None: outputs = self._eval_model(Samples, key_str='validSet') # Run the PCE model with the generated samples pce_outputs, _ = MetaModel.eval_metamodel(samples=Samples) self.rmse = {} self.valid_error = {} # Loop over the keys and compute RMSE error. for key in Model.Output.names: # Root mena square self.rmse[key] = mean_squared_error(outputs[key], pce_outputs[key], squared=False, multioutput='raw_values') # Validation error self.valid_error[key] = (self.rmse[key]**2 / self.n_samples) / \ np.var(outputs[key], ddof=1, axis=0) # Print a report table print("\n>>>>> Errors of {} <<<<<".format(key)) print("\nIndex | RMSE | Validation Error") print('-'*35) print('\n'.join(f'{i+1} | {k:.3e} | {j:.3e}' for i, (k, j) in enumerate(zip(self.rmse[key], self.valid_error[key])))) # Save error dicts in PCEModel object self.MetaModel.rmse = self.rmse self.MetaModel.valid_error = self.valid_error return self.rmse, self.valid_error # ------------------------------------------------------------------------- def plot_seq_design_diagnostics(self, ref_BME_KLD=None, save_fig=True): """ Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence (KLD) for the sequential design. Parameters ---------- ref_BME_KLD : array, optional Reference BME and KLD . The default is `None`. save_fig : bool, optional Whether to save the figures. The default is `True`. Returns ------- None. """ PCEModel = self.MetaModel n_init_samples = PCEModel.ExpDesign.n_init_samples n_total_samples = PCEModel.ExpDesign.X.shape[0] if save_fig: newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}/seqPCEModelDiagnostics.pdf') plotList = ['Modified LOO error', 'Validation error', 'KLD', 'BME', 'RMSEMean', 'RMSEStd', 'Hellinger distance'] seqList = [PCEModel.SeqModifiedLOO, PCEModel.seqValidError, PCEModel.SeqKLD, PCEModel.SeqBME, PCEModel.seqRMSEMean, PCEModel.seqRMSEStd, PCEModel.SeqDistHellinger] markers = ('x', 'o', 'd', '*', '+') colors = ('k', 'darkgreen', 'b', 'navy', 'darkred') # Plot the evolution of the diagnostic criteria of the # Sequential Experimental Design. for plotidx, plot in enumerate(plotList): fig, ax = plt.subplots() seq_dict = seqList[plotidx] name_util = list(seq_dict.keys()) if len(name_util) == 0: continue # Box plot when Replications have been detected. if any(int(name.split("rep_", 1)[1]) > 1 for name in name_util): # Extract the values from dict sorted_seq_opt = {} # Number of replications n_reps = PCEModel.ExpDesign.nReprications # Get the list of utility function names # Handle if only one UtilityFunction is provided if not isinstance(PCEModel.ExpDesign.UtilityFunction, list): util_funcs = [PCEModel.ExpDesign.UtilityFunction] else: util_funcs = PCEModel.ExpDesign.UtilityFunction for util in util_funcs: sortedSeq = {} # min number of runs available from reps n_runs = min([seq_dict[f'{util}_rep_{i+1}'].shape[0] for i in range(n_reps)]) for runIdx in range(n_runs): values = [] for key in seq_dict.keys(): if util in key: values.append(seq_dict[key][runIdx].mean()) sortedSeq['SeqItr_'+str(runIdx)] = np.array(values) sorted_seq_opt[util] = sortedSeq # BoxPlot def draw_plot(data, labels, edge_color, fill_color, idx): pos = labels - (idx-1) bp = plt.boxplot(data, positions=pos, labels=labels, patch_artist=True, sym='', widths=0.75) elements = ['boxes', 'whiskers', 'fliers', 'means', 'medians', 'caps'] for element in elements: plt.setp(bp[element], color=edge_color[idx]) for patch in bp['boxes']: patch.set(facecolor=fill_color[idx]) if PCEModel.ExpDesign.n_new_samples != 1: step1 = PCEModel.ExpDesign.n_new_samples step2 = 1 else: step1 = 5 step2 = 5 edge_color = ['red', 'blue', 'green'] fill_color = ['tan', 'cyan', 'lightgreen'] plot_label = plot # Plot for different Utility Functions for idx, util in enumerate(util_funcs): all_errors = np.empty((n_reps, 0)) for key in list(sorted_seq_opt[util].keys()): errors = sorted_seq_opt.get(util, {}).get(key)[:, None] all_errors = np.hstack((all_errors, errors)) # Special cases for BME and KLD if plot == 'KLD' or plot == 'BME': # BME convergence if refBME is provided if ref_BME_KLD is not None: if plot == 'BME': refValue = ref_BME_KLD[0] plot_label = r'$BME/BME^{Ref.}$' if plot == 'KLD': refValue = ref_BME_KLD[1] plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\ ' / D_{KL}^{Ref.}[p(\theta|y_*), '\ 'p(\theta)]$' # Difference between BME/KLD and the ref. values all_errors = np.divide(all_errors, np.full((all_errors.shape), refValue)) # Plot baseline for zero, i.e. no difference plt.axhline(y=1.0, xmin=0, xmax=1, c='green', ls='--', lw=2) # Plot each UtilFuncs labels = np.arange(n_init_samples, n_total_samples+1, step1) draw_plot(all_errors[:, ::step2], labels, edge_color, fill_color, idx) plt.xticks(labels, labels) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) ax.xaxis.grid(True, which='major', linestyle='-') ax.xaxis.grid(True, which='minor', linestyle='--') # Legend legend_elements = [] for idx, util in enumerate(util_funcs): legend_elements.append(Patch(facecolor=fill_color[idx], edgecolor=edge_color[idx], label=util)) plt.legend(handles=legend_elements[::-1], loc='best') if plot != 'BME' and plot != 'KLD': plt.yscale('log') plt.autoscale(True) plt.xlabel('\\# of training samples') plt.ylabel(plot_label) plt.title(plot) if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # Save arrays into files f = open(f'./{newpath}/Seq{plot}.txt', 'w') f.write(str(sorted_seq_opt)) f.close() else: for idx, name in enumerate(name_util): seq_values = seq_dict[name] if PCEModel.ExpDesign.n_new_samples != 1: step = PCEModel.ExpDesign.n_new_samples else: step = 1 x_idx = np.arange(n_init_samples, n_total_samples+1, step) if n_total_samples not in x_idx: x_idx = np.hstack((x_idx, n_total_samples)) if plot == 'KLD' or plot == 'BME': # BME convergence if refBME is provided if ref_BME_KLD is not None: if plot == 'BME': refValue = ref_BME_KLD[0] plot_label = r'$BME/BME^{Ref.}$' if plot == 'KLD': refValue = ref_BME_KLD[1] plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\ ' / D_{KL}^{Ref.}[p(\theta|y_*), '\ 'p(\theta)]$' # Difference between BME/KLD and the ref. values values = np.divide(seq_values, np.full((seq_values.shape), refValue)) # Plot baseline for zero, i.e. no difference plt.axhline(y=1.0, xmin=0, xmax=1, c='green', ls='--', lw=2) # Set the limits plt.ylim([1e-1, 1e1]) # Create the plots plt.semilogy(x_idx, values, marker=markers[idx], color=colors[idx], ls='--', lw=2, label=name.split("_rep", 1)[0]) else: plot_label = plot # Create the plots plt.plot(x_idx, seq_values, marker=markers[idx], color=colors[idx], ls='--', lw=2, label=name.split("_rep", 1)[0]) else: plot_label = plot seq_values = np.nan_to_num(seq_values) # Plot the error evolution for each output for i in range(seq_values.shape[1]): plt.semilogy(x_idx, seq_values[:, i], ls='--', lw=2, marker=markers[idx], color=colors[idx], alpha=0.15) plt.semilogy(x_idx, seq_values, marker=markers[idx], ls='--', lw=2, color=colors[idx], label=name.split("_rep", 1)[0]) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) ax.xaxis.grid(True, which='major', linestyle='-') ax.xaxis.grid(True, which='minor', linestyle='--') ax.tick_params(axis='both', which='major', direction='in', width=3, length=10) ax.tick_params(axis='both', which='minor', direction='in', width=2, length=8) plt.xlabel('Number of runs') plt.ylabel(plot_label) plt.title(plot) plt.legend(frameon=True) if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # ---------------- Saving arrays into files --------------- np.save(f'./{newpath}/Seq{plot}.npy', seq_values) # Close the pdf pdf.close() return # ------------------------------------------------------------------------- def sobol_indices(self, xlabel='Time [s]', plot_type=None, save_fig=True): """ Provides Sobol indices as a sensitivity measure to infer the importance of the input parameters. See Eq. 27 in [1] for more details. For the case with Principal component analysis refer to [2]. [1] Global sensitivity analysis: A flexible and efficient framework with an example from stochastic hydrogeology S. Oladyshkin, F.P. de Barros, W. Nowak https://doi.org/10.1016/j.advwatres.2011.11.001 [2] Nagel, J.B., Rieckermann, J. and Sudret, B., 2020. Principal component analysis and sparse polynomial chaos expansions for global sensitivity analysis and model calibration: Application to urban drainage simulation. Reliability Engineering & System Safety, 195, p.106737. Parameters ---------- xlabel : str, optional Label of the x-axis. The default is `'Time [s]'`. plot_type : str, optional Plot type. The default is `None`. This corresponds to line plot. Bar chart can be selected by `bar`. save_fig : bool, optional Whether to save the figures. The default is `True`. Returns ------- sobol_cell: dict Sobol indices. total_sobol: dict Total Sobol indices. """ # Extract the necessary variables PCEModel = self.MetaModel basis_dict = PCEModel.basis_dict coeffs_dict = PCEModel.coeffs_dict n_params = PCEModel.n_params max_order = np.max(PCEModel.pce_deg) self.sobol_cell = {} self.total_sobol = {} for Output in PCEModel.ModelObj.Output.names: n_meas_points = len(coeffs_dict[Output]) # Initialize the (cell) array containing the (total) Sobol indices. sobol_array = dict.fromkeys(range(1, max_order+1), []) sobol_cell_array = dict.fromkeys(range(1, max_order+1), []) for i_order in range(1, max_order+1): n_comb = math.comb(n_params, i_order) sobol_cell_array[i_order] = np.zeros((n_comb, n_meas_points)) total_sobol_array = np.zeros((n_params, n_meas_points)) # Initialize the cell to store the names of the variables TotalVariance = np.zeros((n_meas_points)) # Loop over all measurement points and calculate sobol indices for pIdx in range(n_meas_points): # Extract the basis indices (alpha) and coefficients Basis = basis_dict[Output][f'y_{pIdx+1}'] try: clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+1}'] PCECoeffs = clf_poly.coef_ except: PCECoeffs = coeffs_dict[Output][f'y_{pIdx+1}'] # Compute total variance TotalVariance[pIdx] = np.sum(np.square(PCECoeffs[1:])) nzidx = np.where(PCECoeffs != 0)[0] # Set all the Sobol indices equal to zero in the presence of a # null output. if len(nzidx) == 0: # This is buggy. for i_order in range(1, max_order+1): sobol_cell_array[i_order][:, pIdx] = 0 # Otherwise compute them by summing well-chosen coefficients else: nz_basis = Basis[nzidx] for i_order in range(1, max_order+1): idx = np.where(np.sum(nz_basis > 0, axis=1) == i_order) subbasis = nz_basis[idx] Z = np.array(list(combinations(range(n_params), i_order))) for q in range(Z.shape[0]): Zq = Z[q] subsubbasis = subbasis[:, Zq] subidx = np.prod(subsubbasis, axis=1) > 0 sum_ind = nzidx[idx[0][subidx]] if TotalVariance[pIdx] == 0.0: sobol_cell_array[i_order][q, pIdx] = 0.0 else: sobol = np.sum(np.square(PCECoeffs[sum_ind])) sobol /= TotalVariance[pIdx] sobol_cell_array[i_order][q, pIdx] = sobol # Compute the TOTAL Sobol indices. for ParIdx in range(n_params): idx = nz_basis[:, ParIdx] > 0 sum_ind = nzidx[idx] if TotalVariance[pIdx] == 0.0: total_sobol_array[ParIdx, pIdx] = 0.0 else: sobol = np.sum(np.square(PCECoeffs[sum_ind])) sobol /= TotalVariance[pIdx] total_sobol_array[ParIdx, pIdx] = sobol # ----- if PCA selected: Compute covariance ----- if PCEModel.dim_red_method.lower() == 'pca': cov_Z_p_q = np.zeros((n_params)) # Extract the basis indices (alpha) and coefficients for # next component if pIdx < n_meas_points-1: nextBasis = basis_dict[Output][f'y_{pIdx+2}'] try: clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+2}'] nextPCECoeffs = clf_poly.coef_ except: nextPCECoeffs = coeffs_dict[Output][f'y_{pIdx+2}'] # Choose the common non-zero basis mask = (Basis[:, None] == nextBasis).all(-1).any(-1) similar_basis = Basis[mask] # Compute the TOTAL Sobol indices. for ParIdx in range(n_params): idx = similar_basis[:, ParIdx] > 0 try: sum_is = nzidx[idx] cov_Z_p_q[ParIdx] = np.sum(PCECoeffs[sum_ind] * nextPCECoeffs[sum_is]) except: cov_Z_p_q[ParIdx] = 0.0 # Compute the sobol indices according to Ref. 2 if PCEModel.dim_red_method.lower() == 'pca': n_c_points = PCEModel.ExpDesign.Y[Output].shape[1] PCA = PCEModel.pca[Output] compPCA = PCA.components_ nComp = compPCA.shape[0] var_Z_p = PCA.explained_variance_ # Extract the sobol index of the components for i_order in range(1, max_order+1): n_comb = math.comb(n_params, i_order) sobol_array[i_order] = np.zeros((n_comb, n_c_points)) Z = np.array(list(combinations(range(n_params), i_order))) for q in range(Z.shape[0]): S_Z_i = sobol_cell_array[i_order][q] for tIdx in range(n_c_points): var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx]) if var_Y_t == 0.0: term1, term2 = 0.0, 0.0 else: term1 = np.sum([S_Z_i[i]*(var_Z_p[i]*(compPCA[i, tIdx]**2)/var_Y_t) for i in range(nComp)]) # Term 2 # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs Phi_t_p = compPCA[:nComp-1] Phi_t_q = compPCA term2 = 2 * np.sum([cov_Z_p_q[ParIdx] * Phi_t_p[i,tIdx] * Phi_t_q[i,tIdx]/var_Y_t for i in range(nComp-1)]) sobol_array[i_order][q, tIdx] = term1 #+ term2 # Compute the TOTAL Sobol indices. total_sobol = np.zeros((n_params, n_c_points)) for ParIdx in range(n_params): S_Z_i = total_sobol_array[ParIdx] for tIdx in range(n_c_points): var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx]) if var_Y_t == 0.0: term1, term2 = 0.0, 0.0 else: term1 = 0 for i in range(nComp): term1 += S_Z_i[i] * var_Z_p[i] * \ (compPCA[i, tIdx]**2) / var_Y_t # Term 2 # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs Phi_t_p = compPCA[:nComp-1] Phi_t_q = compPCA term2 = 0 for i in range(nComp-1): term2 += cov_Z_p_q[ParIdx] * Phi_t_p[i, tIdx] \ * Phi_t_q[i, tIdx] / var_Y_t term2 *= 2 total_sobol[ParIdx, tIdx] = term1 + term2 self.sobol_cell[Output] = sobol_array self.total_sobol[Output] = total_sobol else: self.sobol_cell[Output] = sobol_cell_array self.total_sobol[Output] = total_sobol_array # ---------------- Plot ----------------------- par_names = PCEModel.ExpDesign.par_names x_values_orig = PCEModel.ExpDesign.x_values cases = [''] for case in cases: newpath = (f'Outputs_PostProcessing_{self.name}/') if not os.path.exists(newpath): os.makedirs(newpath) if save_fig: # create a PdfPages object name = case+'_' if 'Valid' in cases else '' pdf = PdfPages('./'+newpath+name+'Sobol_indices.pdf') fig = plt.figure() for outIdx, Output in enumerate(PCEModel.ModelObj.Output.names): # Extract total Sobol indices total_sobol = self.total_sobol[Output] # Extract a list of x values if type(x_values_orig) is dict: x = x_values_orig[Output] else: x = x_values_orig if plot_type == 'bar': ax = fig.add_axes([0, 0, 1, 1]) dict1 = {xlabel: x} dict2 = {param: sobolIndices for param, sobolIndices in zip(par_names, total_sobol)} df = pd.DataFrame({**dict1, **dict2}) df.plot(x=xlabel, y=par_names, kind="bar", ax=ax, rot=0, colormap='Dark2') ax.set_ylabel('Total Sobol indices, $S^T$') else: for i, sobolIndices in enumerate(total_sobol): plt.plot(x, sobolIndices, label=par_names[i], marker='x', lw=2.5) plt.ylabel('Total Sobol indices, $S^T$') plt.xlabel(xlabel) plt.title(f'Sensitivity analysis of {Output}') if plot_type != 'bar': plt.legend(loc='best', frameon=True) # Save indices np.savetxt(f'./{newpath}{name}totalsobol_' + Output.replace('/', '_') + '.csv', total_sobol.T, delimiter=',', header=','.join(par_names), comments='') if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() return self.sobol_cell, self.total_sobol # ------------------------------------------------------------------------- def check_reg_quality(self, n_samples=1000, samples=None, save_fig=True): """ Checks the quality of the metamodel for single output models based on: https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685 Parameters ---------- n_samples : int, optional Number of parameter sets to use for the check. The default is 1000. samples : array of shape (n_samples, n_params), optional Parameter sets to use for the check. The default is None. save_fig : bool, optional Whether to save the figures. The default is True. Returns ------- None. """ MetaModel = self.MetaModel if samples is None: self.n_samples = n_samples samples = self._get_sample() else: self.n_samples = samples.shape[0] # Evaluate the original and the surrogate model y_val = self._eval_model(samples, key_str='valid') y_pce_val, _ = MetaModel.eval_metamodel(samples=samples) # Open a pdf for the plots if save_fig: newpath = (r'Outputs_PostProcessing_{0}/'.format(self.name)) if not os.path.exists(newpath): os.makedirs(newpath) # Fit the data(train the model) for key in y_pce_val.keys(): y_pce_val_ = y_pce_val[key] y_val_ = y_val[key] # ------ Residuals vs. predicting variables ------ # Check the assumptions of linearity and independence fig1 = plt.figure() plt.title(key+": Residuals vs. Predicting variables") residuals = y_val_ - y_pce_val_ plt.scatter(x=y_val_, y=residuals, color='blue', edgecolor='k') plt.grid(True) xmin, xmax = min(y_val_), max(y_val_) plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3, linestyle='--') plt.xlabel(key) plt.ylabel('Residuals') plt.show() if save_fig: # save the current figure fig1.savefig(f'./{newpath}/Residuals_vs_PredVariables.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Fitted vs. residuals ------ # Check the assumptions of linearity and independence fig2 = plt.figure() plt.title(key+": Residuals vs. predicting variables") plt.scatter(x=y_pce_val_, y=residuals, color='blue', edgecolor='k') plt.grid(True) xmin, xmax = min(y_val_), max(y_val_) plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3, linestyle='--') plt.xlabel(key) plt.ylabel('Residuals') plt.show() if save_fig: # save the current figure fig2.savefig(f'./{newpath}/Fitted_vs_Residuals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Histogram of normalized residuals ------ fig3 = plt.figure() resid_pearson = residuals / (max(residuals)-min(residuals)) plt.hist(resid_pearson, bins=20, edgecolor='k') plt.ylabel('Count') plt.xlabel('Normalized residuals') plt.title(f"{key}: Histogram of normalized residuals") # Normality (Shapiro-Wilk) test of the residuals ax = plt.gca() _, p = stats.shapiro(residuals) if p < 0.01: annText = "The residuals seem to come from Gaussian process." else: annText = "The normality assumption may not hold." at = AnchoredText(annText, prop=dict(size=30), frameon=True, loc='upper left') at.patch.set_boxstyle("round,pad=0.,rounding_size=0.2") ax.add_artist(at) plt.show() if save_fig: # save the current figure fig3.savefig(f'./{newpath}/Hist_NormResiduals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Q-Q plot of the normalized residuals ------ plt.figure() fig4 = qqplot(resid_pearson, line='45', fit='True') plt.xticks() plt.yticks() plt.xlabel("Theoretical quantiles") plt.ylabel("Sample quantiles") plt.title(key+": Q-Q plot of normalized residuals") plt.grid(True) plt.show() if save_fig: # save the current figure fig4.savefig(f'./{newpath}/QQPlot_NormResiduals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------------------------------------------------------------------------- def eval_pce_model_3d(self, save_fig=True): self.n_samples = 1000 PCEModel = self.MetaModel Model = self.MetaModel.ModelObj n_samples = self.n_samples # Create 3D-Grid # TODO: Make it general x = np.linspace(-5, 10, n_samples) y = np.linspace(0, 15, n_samples) X, Y = np.meshgrid(x, y) PCE_Z = np.zeros((self.n_samples, self.n_samples)) Model_Z = np.zeros((self.n_samples, self.n_samples)) for idxMesh in range(self.n_samples): sample_mesh = np.vstack((X[:, idxMesh], Y[:, idxMesh])).T univ_p_val = PCEModel.univ_basis_vals(sample_mesh) for Outkey, ValuesDict in PCEModel.coeffs_dict.items(): pce_out_mean = np.zeros((len(sample_mesh), len(ValuesDict))) pce_out_std = np.zeros((len(sample_mesh), len(ValuesDict))) model_outs = np.zeros((len(sample_mesh), len(ValuesDict))) for Inkey, InIdxValues in ValuesDict.items(): idx = int(Inkey.split('_')[1]) - 1 basis_deg_ind = PCEModel.basis_dict[Outkey][Inkey] clf_poly = PCEModel.clf_poly[Outkey][Inkey] PSI_Val = PCEModel.create_psi(basis_deg_ind, univ_p_val) # Perdiction with error bar y_mean, y_std = clf_poly.predict(PSI_Val, return_std=True) pce_out_mean[:, idx] = y_mean pce_out_std[:, idx] = y_std # Model evaluation model_out_dict, _ = Model.run_model_parallel(sample_mesh, key_str='Valid3D') model_outs[:, idx] = model_out_dict[Outkey].T PCE_Z[:, idxMesh] = y_mean Model_Z[:, idxMesh] = model_outs[:, 0] # ---------------- 3D plot for PCEModel ----------------------- fig_PCE = plt.figure() ax = plt.axes(projection='3d') ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1, cmap='viridis', edgecolor='none') ax.set_title('PCEModel') ax.set_xlabel('$x_1$') ax.set_ylabel('$x_2$') ax.set_zlabel('$f(x_1,x_2)$') plt.grid() plt.show() if save_fig: # Saving the figure newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # save the figure to file fig_PCE.savefig(f'./{newpath}/3DPlot_PCEModel.pdf', format="pdf", bbox_inches='tight') plt.close(fig_PCE) # ---------------- 3D plot for Model ----------------------- fig_Model = plt.figure() ax = plt.axes(projection='3d') ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1, cmap='viridis', edgecolor='none') ax.set_title('Model') ax.set_xlabel('$x_1$') ax.set_ylabel('$x_2$') ax.set_zlabel('$f(x_1,x_2)$') plt.grid() plt.show() if save_fig: # Save the figure fig_Model.savefig(f'./{newpath}/3DPlot_Model.pdf', format="pdf", bbox_inches='tight') plt.close(fig_Model) return # ------------------------------------------------------------------------- def _compute_pce_moments(self): """ Computes the first two moments using the PCE-based meta-model. Returns ------- None. """ MetaModel = self.MetaModel self.pce_means = {} self.pce_stds = {} for Outkey, ValuesDict in MetaModel.coeffs_dict.items(): pce_mean = np.zeros((len(ValuesDict))) pce_var = np.zeros((len(ValuesDict))) for Inkey, InIdxValues in ValuesDict.items(): idx = int(Inkey.split('_')[1]) - 1 coeffs = MetaModel.coeffs_dict[Outkey][Inkey] # Mean = c_0 if coeffs[0] != 0: pce_mean[idx] = coeffs[0] else: pce_mean[idx] = MetaModel.clf_poly[Outkey][Inkey].intercept_ # Var = sum(coeffs[1:]**2) pce_var[idx] = np.sum(np.square(coeffs[1:])) # Back transformation if PCA is selected. if MetaModel.dim_red_method.lower() == 'pca': PCA = MetaModel.pca[Outkey] self.pce_means[Outkey] = PCA.mean_ self.pce_means[Outkey] += np.dot(pce_mean, PCA.components_) self.pce_stds[Outkey] = np.sqrt(np.dot(pce_var, PCA.components_**2)) else: self.pce_means[Outkey] = pce_mean self.pce_stds[Outkey] = np.sqrt(pce_var) # Print a report table print("\n>>>>> Moments of {} <<<<<".format(Outkey)) print("\nIndex | Mean | Std. deviation") print('-'*35) print('\n'.join(f'{i+1} | {k:.3e} | {j:.3e}' for i, (k, j) in enumerate(zip(self.pce_means[Outkey], self.pce_stds[Outkey])))) print('-'*40) # ------------------------------------------------------------------------- def _get_sample(self, n_samples=None): """ Generates random samples taken from the input parameter space. Returns ------- samples : array of shape (n_samples, n_params) Generated samples. """ if n_samples is None: n_samples = self.n_samples PCEModel = self.MetaModel self.samples = PCEModel.ExpDesign.generate_samples(n_samples, 'random') return self.samples # ------------------------------------------------------------------------- def _eval_model(self, samples=None, key_str='Valid'): """ Evaluates Forward Model for the given number of self.samples or given samples. Parameters ---------- samples : array of shape (n_samples, n_params), optional Samples to evaluate the model at. The default is None. key_str : str, optional Key string pass to the model. The default is 'Valid'. Returns ------- model_outs : dict Dictionary of results. """ Model = self.MetaModel.ModelObj if samples is None: samples = self._get_sample() self.samples = samples else: self.n_samples = len(samples) model_outs, _ = Model.run_model_parallel(samples, key_str=key_str) return model_outs # ------------------------------------------------------------------------- def _plot_validation(self, save_fig=True): """ Plots outputs for visual comparison of metamodel outputs with that of the (full) original model. Parameters ---------- save_fig : bool, optional Save the plots. The default is True. Returns ------- None. """ PCEModel = self.MetaModel # get the samples x_val = self.samples y_pce_val = self.pce_out_mean y_val = self.model_out_dict # Open a pdf for the plots if save_fig: newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf1 = PdfPages(f'./{newpath}/Model_vs_PCEModel.pdf') fig = plt.figure() # Fit the data(train the model) for key in y_pce_val.keys(): y_pce_val_ = y_pce_val[key] y_val_ = y_val[key] regression_model = LinearRegression() regression_model.fit(y_pce_val_, y_val_) # Predict x_new = np.linspace(np.min(y_pce_val_), np.max(y_val_), 100) y_predicted = regression_model.predict(x_new[:, np.newaxis]) plt.scatter(y_pce_val_, y_val_, color='gold', linewidth=2) plt.plot(x_new, y_predicted, color='k') # Calculate the adjusted R_squared and RMSE # the total number of explanatory variables in the model # (not including the constant term) length_list = [] for key, value in PCEModel.coeffs_dict[key].items(): length_list.append(len(value)) n_predictors = min(length_list) n_samples = x_val.shape[0] R2 = r2_score(y_pce_val_, y_val_) AdjR2 = 1 - (1 - R2) * (n_samples - 1) / \ (n_samples - n_predictors - 1) rmse = mean_squared_error(y_pce_val_, y_val_, squared=False) plt.annotate(f'RMSE = {rmse:.3f}\n Adjusted $R^2$ = {AdjR2:.3f}', xy=(0.05, 0.85), xycoords='axes fraction') plt.ylabel("Original Model") plt.xlabel("PCE Model") plt.grid() plt.show() if save_fig: # save the current figure pdf1.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # Close the pdfs pdf1.close() # ------------------------------------------------------------------------- def _plot_validation_multi(self, x_values=[], x_axis="x [m]", save_fig=True): """ Plots outputs for visual comparison of metamodel outputs with that of the (full) multioutput original model Parameters ---------- x_values : list or array, optional List of x values. The default is []. x_axis : str, optional Label of the x axis. The default is "x [m]". save_fig : bool, optional Whether to save the figures. The default is True. Returns ------- None. """ Model = self.MetaModel.ModelObj if save_fig: newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}/Model_vs_PCEModel.pdf') # List of markers and colors color = cycle((['b', 'g', 'r', 'y', 'k'])) marker = cycle(('x', 'd', '+', 'o', '*')) fig = plt.figure() # Plot the model vs PCE model for keyIdx, key in enumerate(Model.Output.names): y_pce_val = self.pce_out_mean[key] y_pce_val_std = self.pce_out_std[key] y_val = self.model_out_dict[key] try: x = self.model_out_dict['x_values'][key] except IndexError: x = x_values for idx in range(y_val.shape[0]): Color = next(color) Marker = next(marker) plt.plot(x, y_val[idx], color=Color, marker=Marker, label='$Y_{%s}^M$'%(idx+1)) plt.plot(x, y_pce_val[idx], color=Color, marker=Marker, linestyle='--', label='$Y_{%s}^{PCE}$'%(idx+1)) plt.fill_between(x, y_pce_val[idx]-1.96*y_pce_val_std[idx], y_pce_val[idx]+1.96*y_pce_val_std[idx], color=Color, alpha=0.15) # Calculate the RMSE rmse = mean_squared_error(y_pce_val, y_val, squared=False) R2 = r2_score(y_pce_val[idx].reshape(-1, 1), y_val[idx].reshape(-1, 1)) plt.annotate(f'RMSE = {rmse:.3f}\n $R^2$ = {R2:.3f}', xy=(0.2, 0.75), xycoords='axes fraction') plt.ylabel(key) plt.xlabel(x_axis) plt.legend(loc='best') plt.grid() if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() # Zip the subdirectories Model.zip_subdirs(f'{Model.name}valid', f'{Model.name}valid_')
Methods
def plot_moments(self, xlabel='Time [s]', plot_type=None, save_fig=True)
-
Plots the moments in a pdf format in the directory
Outputs_PostProcessing
.Parameters
xlabel
:str
, optional- String to be displayed as x-label. The default is
'Time [s]'
. plot_type
:str
, optional- Options: bar or line. The default is
None
. save_fig
:bool
, optional- Save figure or not. The default is
True
.
Returns
pce_means
:dict
- Mean of the model outputs.
pce_means
:dict
- Standard deviation of the model outputs.
Expand source code
def plot_moments(self, xlabel='Time [s]', plot_type=None, save_fig=True): """ Plots the moments in a pdf format in the directory `Outputs_PostProcessing`. Parameters ---------- xlabel : str, optional String to be displayed as x-label. The default is `'Time [s]'`. plot_type : str, optional Options: bar or line. The default is `None`. save_fig : bool, optional Save figure or not. The default is `True`. Returns ------- pce_means: dict Mean of the model outputs. pce_means: dict Standard deviation of the model outputs. """ bar_plot = True if plot_type == 'bar' else False meta_model_type = self.MetaModel.meta_model_type Model = self.MetaModel.ModelObj # Read Monte-Carlo reference self.mc_reference = Model.read_mc_reference() print(self.mc_reference) # Set the x values x_values_orig = self.MetaModel.ExpDesign.x_values # Compute the moments with the PCEModel object self._compute_pce_moments() # Get the variables out_names = Model.Output.names # Open a pdf for the plots if save_fig: newpath = (f'Outputs_PostProcessing_{self.name}/') if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}Mean_Std_PCE.pdf') # Plot the best fit line, set the linewidth (lw), color and # transparency (alpha) of the line for idx, key in enumerate(out_names): fig, ax = plt.subplots(nrows=1, ncols=2) # Extract mean and std mean_data = self.pce_means[key] std_data = self.pce_stds[key] # Extract a list of x values if type(x_values_orig) is dict: x = x_values_orig[key] else: x = x_values_orig # Plot: bar plot or line plot if bar_plot: ax[0].bar(list(map(str, x)), mean_data, color='b', width=0.25) ax[1].bar(list(map(str, x)), std_data, color='b', width=0.25) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: ax[0].plot(x, mean_data, lw=3, color='k', marker='x', label=meta_model_type) ax[1].plot(x, std_data, lw=3, color='k', marker='x', label=meta_model_type) if self.mc_reference is not None: if bar_plot: ax[0].bar(list(map(str, x)), self.mc_reference['mean'], color='r', width=0.25) ax[1].bar(list(map(str, x)), self.mc_reference['std'], color='r', width=0.25) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: ax[0].plot(x, self.mc_reference['mean'], lw=3, marker='x', color='r', label='Ref.') ax[1].plot(x, self.mc_reference['std'], lw=3, marker='x', color='r', label='Ref.') # Label the axes and provide a title ax[0].set_xlabel(xlabel) ax[1].set_xlabel(xlabel) ax[0].set_ylabel(key) ax[1].set_ylabel(key) # Provide a title ax[0].set_title('Mean of ' + key) ax[1].set_title('Std of ' + key) if not bar_plot: ax[0].legend(loc='best') ax[1].legend(loc='best') plt.tight_layout() if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() return self.pce_means, self.pce_stds
def valid_metamodel(self, n_samples=1, samples=None, x_axis='Time [s]')
-
Evaluates and plots the meta model and the PCEModel outputs for the given number of samples or the given samples.
Parameters
n_samples
:int
, optional- Number of samples to be evaluated. The default is 1.
samples
:array
ofshape (n_samples, n_params)
, optional- Samples to be evaluated. The default is None.
x_axis
:str
, optional- Label of x axis. The default is
'Time [s]'
.
Returns
None.
Expand source code
def valid_metamodel(self, n_samples=1, samples=None, x_axis='Time [s]'): """ Evaluates and plots the meta model and the PCEModel outputs for the given number of samples or the given samples. Parameters ---------- n_samples : int, optional Number of samples to be evaluated. The default is 1. samples : array of shape (n_samples, n_params), optional Samples to be evaluated. The default is None. x_axis : str, optional Label of x axis. The default is `'Time [s]'`. Returns ------- None. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj if samples is None: self.n_samples = n_samples samples = self._get_sample() else: self.n_samples = samples.shape[0] # Extract x_values x_values = MetaModel.ExpDesign.x_values self.model_out_dict = self._eval_model(samples, key_str='valid') self.pce_out_mean, self.pce_out_std = MetaModel.eval_metamodel(samples) try: key = Model.Output.names[1] except IndexError: key = Model.Output.names[0] n_obs = self.model_out_dict[key].shape[1] if n_obs == 1: self._plot_validation() else: self._plot_validation_multi(x_values=x_values, x_axis=x_axis)
def check_accuracy(self, n_samples=None, samples=None, outputs=None)
-
Checks accuracy of the metamodel by computing the root mean square error and validation error for all outputs.
Parameters
n_samples
:int
, optional- Number of samples. The default is None.
samples
:array
ofshape (n_samples, n_params)
, optional- Parameter sets to be checked. The default is None.
outputs
:dict
, optional- Output dictionary with model outputs for all given output types in
Model.Output.names
. The default is None.
Raises
Exception
- When neither n_samples nor samples are provided.
Returns
rmse
:dict
- Root mean squared error for each output.
valid_error
:dict
- Validation error for each output.
Expand source code
def check_accuracy(self, n_samples=None, samples=None, outputs=None): """ Checks accuracy of the metamodel by computing the root mean square error and validation error for all outputs. Parameters ---------- n_samples : int, optional Number of samples. The default is None. samples : array of shape (n_samples, n_params), optional Parameter sets to be checked. The default is None. outputs : dict, optional Output dictionary with model outputs for all given output types in `Model.Output.names`. The default is None. Raises ------ Exception When neither n_samples nor samples are provided. Returns ------- rmse: dict Root mean squared error for each output. valid_error : dict Validation error for each output. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj # Set the number of samples if n_samples: self.n_samples = n_samples elif samples is not None: self.n_samples = samples.shape[0] else: raise Exception("Please provide either samples or pass number of " "samples!") # Generate random samples if necessary Samples = self._get_sample() if samples is None else samples # Run the original model with the generated samples if outputs is None: outputs = self._eval_model(Samples, key_str='validSet') # Run the PCE model with the generated samples pce_outputs, _ = MetaModel.eval_metamodel(samples=Samples) self.rmse = {} self.valid_error = {} # Loop over the keys and compute RMSE error. for key in Model.Output.names: # Root mena square self.rmse[key] = mean_squared_error(outputs[key], pce_outputs[key], squared=False, multioutput='raw_values') # Validation error self.valid_error[key] = (self.rmse[key]**2 / self.n_samples) / \ np.var(outputs[key], ddof=1, axis=0) # Print a report table print("\n>>>>> Errors of {} <<<<<".format(key)) print("\nIndex | RMSE | Validation Error") print('-'*35) print('\n'.join(f'{i+1} | {k:.3e} | {j:.3e}' for i, (k, j) in enumerate(zip(self.rmse[key], self.valid_error[key])))) # Save error dicts in PCEModel object self.MetaModel.rmse = self.rmse self.MetaModel.valid_error = self.valid_error return self.rmse, self.valid_error
def plot_seq_design_diagnostics(self, ref_BME_KLD=None, save_fig=True)
-
Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence (KLD) for the sequential design.
Parameters
ref_BME_KLD
:array
, optional- Reference BME and KLD . The default is
None
. save_fig
:bool
, optional- Whether to save the figures. The default is
True
.
Returns
None.
Expand source code
def plot_seq_design_diagnostics(self, ref_BME_KLD=None, save_fig=True): """ Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence (KLD) for the sequential design. Parameters ---------- ref_BME_KLD : array, optional Reference BME and KLD . The default is `None`. save_fig : bool, optional Whether to save the figures. The default is `True`. Returns ------- None. """ PCEModel = self.MetaModel n_init_samples = PCEModel.ExpDesign.n_init_samples n_total_samples = PCEModel.ExpDesign.X.shape[0] if save_fig: newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}/seqPCEModelDiagnostics.pdf') plotList = ['Modified LOO error', 'Validation error', 'KLD', 'BME', 'RMSEMean', 'RMSEStd', 'Hellinger distance'] seqList = [PCEModel.SeqModifiedLOO, PCEModel.seqValidError, PCEModel.SeqKLD, PCEModel.SeqBME, PCEModel.seqRMSEMean, PCEModel.seqRMSEStd, PCEModel.SeqDistHellinger] markers = ('x', 'o', 'd', '*', '+') colors = ('k', 'darkgreen', 'b', 'navy', 'darkred') # Plot the evolution of the diagnostic criteria of the # Sequential Experimental Design. for plotidx, plot in enumerate(plotList): fig, ax = plt.subplots() seq_dict = seqList[plotidx] name_util = list(seq_dict.keys()) if len(name_util) == 0: continue # Box plot when Replications have been detected. if any(int(name.split("rep_", 1)[1]) > 1 for name in name_util): # Extract the values from dict sorted_seq_opt = {} # Number of replications n_reps = PCEModel.ExpDesign.nReprications # Get the list of utility function names # Handle if only one UtilityFunction is provided if not isinstance(PCEModel.ExpDesign.UtilityFunction, list): util_funcs = [PCEModel.ExpDesign.UtilityFunction] else: util_funcs = PCEModel.ExpDesign.UtilityFunction for util in util_funcs: sortedSeq = {} # min number of runs available from reps n_runs = min([seq_dict[f'{util}_rep_{i+1}'].shape[0] for i in range(n_reps)]) for runIdx in range(n_runs): values = [] for key in seq_dict.keys(): if util in key: values.append(seq_dict[key][runIdx].mean()) sortedSeq['SeqItr_'+str(runIdx)] = np.array(values) sorted_seq_opt[util] = sortedSeq # BoxPlot def draw_plot(data, labels, edge_color, fill_color, idx): pos = labels - (idx-1) bp = plt.boxplot(data, positions=pos, labels=labels, patch_artist=True, sym='', widths=0.75) elements = ['boxes', 'whiskers', 'fliers', 'means', 'medians', 'caps'] for element in elements: plt.setp(bp[element], color=edge_color[idx]) for patch in bp['boxes']: patch.set(facecolor=fill_color[idx]) if PCEModel.ExpDesign.n_new_samples != 1: step1 = PCEModel.ExpDesign.n_new_samples step2 = 1 else: step1 = 5 step2 = 5 edge_color = ['red', 'blue', 'green'] fill_color = ['tan', 'cyan', 'lightgreen'] plot_label = plot # Plot for different Utility Functions for idx, util in enumerate(util_funcs): all_errors = np.empty((n_reps, 0)) for key in list(sorted_seq_opt[util].keys()): errors = sorted_seq_opt.get(util, {}).get(key)[:, None] all_errors = np.hstack((all_errors, errors)) # Special cases for BME and KLD if plot == 'KLD' or plot == 'BME': # BME convergence if refBME is provided if ref_BME_KLD is not None: if plot == 'BME': refValue = ref_BME_KLD[0] plot_label = r'$BME/BME^{Ref.}$' if plot == 'KLD': refValue = ref_BME_KLD[1] plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\ ' / D_{KL}^{Ref.}[p(\theta|y_*), '\ 'p(\theta)]$' # Difference between BME/KLD and the ref. values all_errors = np.divide(all_errors, np.full((all_errors.shape), refValue)) # Plot baseline for zero, i.e. no difference plt.axhline(y=1.0, xmin=0, xmax=1, c='green', ls='--', lw=2) # Plot each UtilFuncs labels = np.arange(n_init_samples, n_total_samples+1, step1) draw_plot(all_errors[:, ::step2], labels, edge_color, fill_color, idx) plt.xticks(labels, labels) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) ax.xaxis.grid(True, which='major', linestyle='-') ax.xaxis.grid(True, which='minor', linestyle='--') # Legend legend_elements = [] for idx, util in enumerate(util_funcs): legend_elements.append(Patch(facecolor=fill_color[idx], edgecolor=edge_color[idx], label=util)) plt.legend(handles=legend_elements[::-1], loc='best') if plot != 'BME' and plot != 'KLD': plt.yscale('log') plt.autoscale(True) plt.xlabel('\\# of training samples') plt.ylabel(plot_label) plt.title(plot) if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # Save arrays into files f = open(f'./{newpath}/Seq{plot}.txt', 'w') f.write(str(sorted_seq_opt)) f.close() else: for idx, name in enumerate(name_util): seq_values = seq_dict[name] if PCEModel.ExpDesign.n_new_samples != 1: step = PCEModel.ExpDesign.n_new_samples else: step = 1 x_idx = np.arange(n_init_samples, n_total_samples+1, step) if n_total_samples not in x_idx: x_idx = np.hstack((x_idx, n_total_samples)) if plot == 'KLD' or plot == 'BME': # BME convergence if refBME is provided if ref_BME_KLD is not None: if plot == 'BME': refValue = ref_BME_KLD[0] plot_label = r'$BME/BME^{Ref.}$' if plot == 'KLD': refValue = ref_BME_KLD[1] plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\ ' / D_{KL}^{Ref.}[p(\theta|y_*), '\ 'p(\theta)]$' # Difference between BME/KLD and the ref. values values = np.divide(seq_values, np.full((seq_values.shape), refValue)) # Plot baseline for zero, i.e. no difference plt.axhline(y=1.0, xmin=0, xmax=1, c='green', ls='--', lw=2) # Set the limits plt.ylim([1e-1, 1e1]) # Create the plots plt.semilogy(x_idx, values, marker=markers[idx], color=colors[idx], ls='--', lw=2, label=name.split("_rep", 1)[0]) else: plot_label = plot # Create the plots plt.plot(x_idx, seq_values, marker=markers[idx], color=colors[idx], ls='--', lw=2, label=name.split("_rep", 1)[0]) else: plot_label = plot seq_values = np.nan_to_num(seq_values) # Plot the error evolution for each output for i in range(seq_values.shape[1]): plt.semilogy(x_idx, seq_values[:, i], ls='--', lw=2, marker=markers[idx], color=colors[idx], alpha=0.15) plt.semilogy(x_idx, seq_values, marker=markers[idx], ls='--', lw=2, color=colors[idx], label=name.split("_rep", 1)[0]) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) ax.xaxis.grid(True, which='major', linestyle='-') ax.xaxis.grid(True, which='minor', linestyle='--') ax.tick_params(axis='both', which='major', direction='in', width=3, length=10) ax.tick_params(axis='both', which='minor', direction='in', width=2, length=8) plt.xlabel('Number of runs') plt.ylabel(plot_label) plt.title(plot) plt.legend(frameon=True) if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # ---------------- Saving arrays into files --------------- np.save(f'./{newpath}/Seq{plot}.npy', seq_values) # Close the pdf pdf.close() return
def sobol_indices(self, xlabel='Time [s]', plot_type=None, save_fig=True)
-
Provides Sobol indices as a sensitivity measure to infer the importance of the input parameters. See Eq. 27 in [1] for more details. For the case with Principal component analysis refer to [2].
[1] Global sensitivity analysis: A flexible and efficient framework with an example from stochastic hydrogeology S. Oladyshkin, F.P. de Barros, W. Nowak https://doi.org/10.1016/j.advwatres.2011.11.001
[2] Nagel, J.B., Rieckermann, J. and Sudret, B., 2020. Principal component analysis and sparse polynomial chaos expansions for global sensitivity analysis and model calibration: Application to urban drainage simulation. Reliability Engineering & System Safety, 195, p.106737.
Parameters
xlabel
:str
, optional- Label of the x-axis. The default is
'Time [s]'
. plot_type
:str
, optional- Plot type. The default is
None
. This corresponds to line plot. Bar chart can be selected bybar
. save_fig
:bool
, optional- Whether to save the figures. The default is
True
.
Returns
sobol_cell
:dict
- Sobol indices.
total_sobol
:dict
- Total Sobol indices.
Expand source code
def sobol_indices(self, xlabel='Time [s]', plot_type=None, save_fig=True): """ Provides Sobol indices as a sensitivity measure to infer the importance of the input parameters. See Eq. 27 in [1] for more details. For the case with Principal component analysis refer to [2]. [1] Global sensitivity analysis: A flexible and efficient framework with an example from stochastic hydrogeology S. Oladyshkin, F.P. de Barros, W. Nowak https://doi.org/10.1016/j.advwatres.2011.11.001 [2] Nagel, J.B., Rieckermann, J. and Sudret, B., 2020. Principal component analysis and sparse polynomial chaos expansions for global sensitivity analysis and model calibration: Application to urban drainage simulation. Reliability Engineering & System Safety, 195, p.106737. Parameters ---------- xlabel : str, optional Label of the x-axis. The default is `'Time [s]'`. plot_type : str, optional Plot type. The default is `None`. This corresponds to line plot. Bar chart can be selected by `bar`. save_fig : bool, optional Whether to save the figures. The default is `True`. Returns ------- sobol_cell: dict Sobol indices. total_sobol: dict Total Sobol indices. """ # Extract the necessary variables PCEModel = self.MetaModel basis_dict = PCEModel.basis_dict coeffs_dict = PCEModel.coeffs_dict n_params = PCEModel.n_params max_order = np.max(PCEModel.pce_deg) self.sobol_cell = {} self.total_sobol = {} for Output in PCEModel.ModelObj.Output.names: n_meas_points = len(coeffs_dict[Output]) # Initialize the (cell) array containing the (total) Sobol indices. sobol_array = dict.fromkeys(range(1, max_order+1), []) sobol_cell_array = dict.fromkeys(range(1, max_order+1), []) for i_order in range(1, max_order+1): n_comb = math.comb(n_params, i_order) sobol_cell_array[i_order] = np.zeros((n_comb, n_meas_points)) total_sobol_array = np.zeros((n_params, n_meas_points)) # Initialize the cell to store the names of the variables TotalVariance = np.zeros((n_meas_points)) # Loop over all measurement points and calculate sobol indices for pIdx in range(n_meas_points): # Extract the basis indices (alpha) and coefficients Basis = basis_dict[Output][f'y_{pIdx+1}'] try: clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+1}'] PCECoeffs = clf_poly.coef_ except: PCECoeffs = coeffs_dict[Output][f'y_{pIdx+1}'] # Compute total variance TotalVariance[pIdx] = np.sum(np.square(PCECoeffs[1:])) nzidx = np.where(PCECoeffs != 0)[0] # Set all the Sobol indices equal to zero in the presence of a # null output. if len(nzidx) == 0: # This is buggy. for i_order in range(1, max_order+1): sobol_cell_array[i_order][:, pIdx] = 0 # Otherwise compute them by summing well-chosen coefficients else: nz_basis = Basis[nzidx] for i_order in range(1, max_order+1): idx = np.where(np.sum(nz_basis > 0, axis=1) == i_order) subbasis = nz_basis[idx] Z = np.array(list(combinations(range(n_params), i_order))) for q in range(Z.shape[0]): Zq = Z[q] subsubbasis = subbasis[:, Zq] subidx = np.prod(subsubbasis, axis=1) > 0 sum_ind = nzidx[idx[0][subidx]] if TotalVariance[pIdx] == 0.0: sobol_cell_array[i_order][q, pIdx] = 0.0 else: sobol = np.sum(np.square(PCECoeffs[sum_ind])) sobol /= TotalVariance[pIdx] sobol_cell_array[i_order][q, pIdx] = sobol # Compute the TOTAL Sobol indices. for ParIdx in range(n_params): idx = nz_basis[:, ParIdx] > 0 sum_ind = nzidx[idx] if TotalVariance[pIdx] == 0.0: total_sobol_array[ParIdx, pIdx] = 0.0 else: sobol = np.sum(np.square(PCECoeffs[sum_ind])) sobol /= TotalVariance[pIdx] total_sobol_array[ParIdx, pIdx] = sobol # ----- if PCA selected: Compute covariance ----- if PCEModel.dim_red_method.lower() == 'pca': cov_Z_p_q = np.zeros((n_params)) # Extract the basis indices (alpha) and coefficients for # next component if pIdx < n_meas_points-1: nextBasis = basis_dict[Output][f'y_{pIdx+2}'] try: clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+2}'] nextPCECoeffs = clf_poly.coef_ except: nextPCECoeffs = coeffs_dict[Output][f'y_{pIdx+2}'] # Choose the common non-zero basis mask = (Basis[:, None] == nextBasis).all(-1).any(-1) similar_basis = Basis[mask] # Compute the TOTAL Sobol indices. for ParIdx in range(n_params): idx = similar_basis[:, ParIdx] > 0 try: sum_is = nzidx[idx] cov_Z_p_q[ParIdx] = np.sum(PCECoeffs[sum_ind] * nextPCECoeffs[sum_is]) except: cov_Z_p_q[ParIdx] = 0.0 # Compute the sobol indices according to Ref. 2 if PCEModel.dim_red_method.lower() == 'pca': n_c_points = PCEModel.ExpDesign.Y[Output].shape[1] PCA = PCEModel.pca[Output] compPCA = PCA.components_ nComp = compPCA.shape[0] var_Z_p = PCA.explained_variance_ # Extract the sobol index of the components for i_order in range(1, max_order+1): n_comb = math.comb(n_params, i_order) sobol_array[i_order] = np.zeros((n_comb, n_c_points)) Z = np.array(list(combinations(range(n_params), i_order))) for q in range(Z.shape[0]): S_Z_i = sobol_cell_array[i_order][q] for tIdx in range(n_c_points): var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx]) if var_Y_t == 0.0: term1, term2 = 0.0, 0.0 else: term1 = np.sum([S_Z_i[i]*(var_Z_p[i]*(compPCA[i, tIdx]**2)/var_Y_t) for i in range(nComp)]) # Term 2 # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs Phi_t_p = compPCA[:nComp-1] Phi_t_q = compPCA term2 = 2 * np.sum([cov_Z_p_q[ParIdx] * Phi_t_p[i,tIdx] * Phi_t_q[i,tIdx]/var_Y_t for i in range(nComp-1)]) sobol_array[i_order][q, tIdx] = term1 #+ term2 # Compute the TOTAL Sobol indices. total_sobol = np.zeros((n_params, n_c_points)) for ParIdx in range(n_params): S_Z_i = total_sobol_array[ParIdx] for tIdx in range(n_c_points): var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx]) if var_Y_t == 0.0: term1, term2 = 0.0, 0.0 else: term1 = 0 for i in range(nComp): term1 += S_Z_i[i] * var_Z_p[i] * \ (compPCA[i, tIdx]**2) / var_Y_t # Term 2 # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs Phi_t_p = compPCA[:nComp-1] Phi_t_q = compPCA term2 = 0 for i in range(nComp-1): term2 += cov_Z_p_q[ParIdx] * Phi_t_p[i, tIdx] \ * Phi_t_q[i, tIdx] / var_Y_t term2 *= 2 total_sobol[ParIdx, tIdx] = term1 + term2 self.sobol_cell[Output] = sobol_array self.total_sobol[Output] = total_sobol else: self.sobol_cell[Output] = sobol_cell_array self.total_sobol[Output] = total_sobol_array # ---------------- Plot ----------------------- par_names = PCEModel.ExpDesign.par_names x_values_orig = PCEModel.ExpDesign.x_values cases = [''] for case in cases: newpath = (f'Outputs_PostProcessing_{self.name}/') if not os.path.exists(newpath): os.makedirs(newpath) if save_fig: # create a PdfPages object name = case+'_' if 'Valid' in cases else '' pdf = PdfPages('./'+newpath+name+'Sobol_indices.pdf') fig = plt.figure() for outIdx, Output in enumerate(PCEModel.ModelObj.Output.names): # Extract total Sobol indices total_sobol = self.total_sobol[Output] # Extract a list of x values if type(x_values_orig) is dict: x = x_values_orig[Output] else: x = x_values_orig if plot_type == 'bar': ax = fig.add_axes([0, 0, 1, 1]) dict1 = {xlabel: x} dict2 = {param: sobolIndices for param, sobolIndices in zip(par_names, total_sobol)} df = pd.DataFrame({**dict1, **dict2}) df.plot(x=xlabel, y=par_names, kind="bar", ax=ax, rot=0, colormap='Dark2') ax.set_ylabel('Total Sobol indices, $S^T$') else: for i, sobolIndices in enumerate(total_sobol): plt.plot(x, sobolIndices, label=par_names[i], marker='x', lw=2.5) plt.ylabel('Total Sobol indices, $S^T$') plt.xlabel(xlabel) plt.title(f'Sensitivity analysis of {Output}') if plot_type != 'bar': plt.legend(loc='best', frameon=True) # Save indices np.savetxt(f'./{newpath}{name}totalsobol_' + Output.replace('/', '_') + '.csv', total_sobol.T, delimiter=',', header=','.join(par_names), comments='') if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() return self.sobol_cell, self.total_sobol
def check_reg_quality(self, n_samples=1000, samples=None, save_fig=True)
-
Checks the quality of the metamodel for single output models based on: https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685
Parameters
n_samples
:int
, optional- Number of parameter sets to use for the check. The default is 1000.
samples
:array
ofshape (n_samples, n_params)
, optional- Parameter sets to use for the check. The default is None.
save_fig
:bool
, optional- Whether to save the figures. The default is True.
Returns
None.
Expand source code
def check_reg_quality(self, n_samples=1000, samples=None, save_fig=True): """ Checks the quality of the metamodel for single output models based on: https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685 Parameters ---------- n_samples : int, optional Number of parameter sets to use for the check. The default is 1000. samples : array of shape (n_samples, n_params), optional Parameter sets to use for the check. The default is None. save_fig : bool, optional Whether to save the figures. The default is True. Returns ------- None. """ MetaModel = self.MetaModel if samples is None: self.n_samples = n_samples samples = self._get_sample() else: self.n_samples = samples.shape[0] # Evaluate the original and the surrogate model y_val = self._eval_model(samples, key_str='valid') y_pce_val, _ = MetaModel.eval_metamodel(samples=samples) # Open a pdf for the plots if save_fig: newpath = (r'Outputs_PostProcessing_{0}/'.format(self.name)) if not os.path.exists(newpath): os.makedirs(newpath) # Fit the data(train the model) for key in y_pce_val.keys(): y_pce_val_ = y_pce_val[key] y_val_ = y_val[key] # ------ Residuals vs. predicting variables ------ # Check the assumptions of linearity and independence fig1 = plt.figure() plt.title(key+": Residuals vs. Predicting variables") residuals = y_val_ - y_pce_val_ plt.scatter(x=y_val_, y=residuals, color='blue', edgecolor='k') plt.grid(True) xmin, xmax = min(y_val_), max(y_val_) plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3, linestyle='--') plt.xlabel(key) plt.ylabel('Residuals') plt.show() if save_fig: # save the current figure fig1.savefig(f'./{newpath}/Residuals_vs_PredVariables.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Fitted vs. residuals ------ # Check the assumptions of linearity and independence fig2 = plt.figure() plt.title(key+": Residuals vs. predicting variables") plt.scatter(x=y_pce_val_, y=residuals, color='blue', edgecolor='k') plt.grid(True) xmin, xmax = min(y_val_), max(y_val_) plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3, linestyle='--') plt.xlabel(key) plt.ylabel('Residuals') plt.show() if save_fig: # save the current figure fig2.savefig(f'./{newpath}/Fitted_vs_Residuals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Histogram of normalized residuals ------ fig3 = plt.figure() resid_pearson = residuals / (max(residuals)-min(residuals)) plt.hist(resid_pearson, bins=20, edgecolor='k') plt.ylabel('Count') plt.xlabel('Normalized residuals') plt.title(f"{key}: Histogram of normalized residuals") # Normality (Shapiro-Wilk) test of the residuals ax = plt.gca() _, p = stats.shapiro(residuals) if p < 0.01: annText = "The residuals seem to come from Gaussian process." else: annText = "The normality assumption may not hold." at = AnchoredText(annText, prop=dict(size=30), frameon=True, loc='upper left') at.patch.set_boxstyle("round,pad=0.,rounding_size=0.2") ax.add_artist(at) plt.show() if save_fig: # save the current figure fig3.savefig(f'./{newpath}/Hist_NormResiduals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Q-Q plot of the normalized residuals ------ plt.figure() fig4 = qqplot(resid_pearson, line='45', fit='True') plt.xticks() plt.yticks() plt.xlabel("Theoretical quantiles") plt.ylabel("Sample quantiles") plt.title(key+": Q-Q plot of normalized residuals") plt.grid(True) plt.show() if save_fig: # save the current figure fig4.savefig(f'./{newpath}/QQPlot_NormResiduals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf()
def eval_pce_model_3d(self, save_fig=True)
-
Expand source code
def eval_pce_model_3d(self, save_fig=True): self.n_samples = 1000 PCEModel = self.MetaModel Model = self.MetaModel.ModelObj n_samples = self.n_samples # Create 3D-Grid # TODO: Make it general x = np.linspace(-5, 10, n_samples) y = np.linspace(0, 15, n_samples) X, Y = np.meshgrid(x, y) PCE_Z = np.zeros((self.n_samples, self.n_samples)) Model_Z = np.zeros((self.n_samples, self.n_samples)) for idxMesh in range(self.n_samples): sample_mesh = np.vstack((X[:, idxMesh], Y[:, idxMesh])).T univ_p_val = PCEModel.univ_basis_vals(sample_mesh) for Outkey, ValuesDict in PCEModel.coeffs_dict.items(): pce_out_mean = np.zeros((len(sample_mesh), len(ValuesDict))) pce_out_std = np.zeros((len(sample_mesh), len(ValuesDict))) model_outs = np.zeros((len(sample_mesh), len(ValuesDict))) for Inkey, InIdxValues in ValuesDict.items(): idx = int(Inkey.split('_')[1]) - 1 basis_deg_ind = PCEModel.basis_dict[Outkey][Inkey] clf_poly = PCEModel.clf_poly[Outkey][Inkey] PSI_Val = PCEModel.create_psi(basis_deg_ind, univ_p_val) # Perdiction with error bar y_mean, y_std = clf_poly.predict(PSI_Val, return_std=True) pce_out_mean[:, idx] = y_mean pce_out_std[:, idx] = y_std # Model evaluation model_out_dict, _ = Model.run_model_parallel(sample_mesh, key_str='Valid3D') model_outs[:, idx] = model_out_dict[Outkey].T PCE_Z[:, idxMesh] = y_mean Model_Z[:, idxMesh] = model_outs[:, 0] # ---------------- 3D plot for PCEModel ----------------------- fig_PCE = plt.figure() ax = plt.axes(projection='3d') ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1, cmap='viridis', edgecolor='none') ax.set_title('PCEModel') ax.set_xlabel('$x_1$') ax.set_ylabel('$x_2$') ax.set_zlabel('$f(x_1,x_2)$') plt.grid() plt.show() if save_fig: # Saving the figure newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # save the figure to file fig_PCE.savefig(f'./{newpath}/3DPlot_PCEModel.pdf', format="pdf", bbox_inches='tight') plt.close(fig_PCE) # ---------------- 3D plot for Model ----------------------- fig_Model = plt.figure() ax = plt.axes(projection='3d') ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1, cmap='viridis', edgecolor='none') ax.set_title('Model') ax.set_xlabel('$x_1$') ax.set_ylabel('$x_2$') ax.set_zlabel('$f(x_1,x_2)$') plt.grid() plt.show() if save_fig: # Save the figure fig_Model.savefig(f'./{newpath}/3DPlot_Model.pdf', format="pdf", bbox_inches='tight') plt.close(fig_Model) return