diff --git a/.gitignore b/.gitignore index 57618a482467ec8686bf7be2e8156ce669a6c8f7..d717f5ea80850a8732e6c88ea0c5bcdf821fcd37 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,7 @@ Outputs_* # Ignore test results .coverage* +<<<<<<< HEAD <<<<<<< HEAD # Ignore folders for test-builds of the gitlab-pages public/* @@ -42,3 +43,7 @@ public/* # Ignore files for vscode .vscode/* >>>>>>> 728df39b (Update PostProcessing tests) +======= +# Ignore files for vscode +.vscode/* +>>>>>>> fb608a8b17595e3ddbb737043787e6d28266d003 diff --git a/__pycache__/__init__.cpython-311.pyc b/__pycache__/__init__.cpython-311.pyc index a50469ed17864f696db773fddcadf9eca9a3dc9d..a56ee1f6b4dae7b5e88e838093159afdbc581359 100644 Binary files a/__pycache__/__init__.cpython-311.pyc and b/__pycache__/__init__.cpython-311.pyc differ diff --git a/examples/analytical-function/example_analytical_function.py b/examples/analytical-function/example_analytical_function.py index 481fcfcabdf1386557e3ea923c2b62f8e3a4e3dd..399d8566645b855a1898ab8fb844000defc9181d 100644 --- a/examples/analytical-function/example_analytical_function.py +++ b/examples/analytical-function/example_analytical_function.py @@ -229,7 +229,7 @@ if __name__ == "__main__": # ===================================================== # ========= POST PROCESSING OF METAMODELS =========== # ===================================================== - PostPCE = PostProcessing(engine) + PostPCE = PostProcessing(engine, out_format="png") # Plot to check validation visually. PostPCE.valid_metamodel(n_samples=1) diff --git a/examples/beam/Beam9points_1/SSBeam_Deflection_1.inp b/examples/beam/Beam9points_1/SSBeam_Deflection_1.inp index fec3b20b0b63491c95e1deab597784c31448405f..594ec4fa58147dfc297ec5df6064405d217bbabc 100644 --- a/examples/beam/Beam9points_1/SSBeam_Deflection_1.inp +++ b/examples/beam/Beam9points_1/SSBeam_Deflection_1.inp @@ -1,6 +1,6 @@ % Input file for the simply supported beam model -1.5557e-01 % b in m -2.8052e-01 % h in m +1.2759e-01 % b in m +2.8948e-01 % h in m 5 % L in m -2.2558e+10 % E in Pa -9.2858e+03 % p in N/m +2.1572e+10 % E in Pa +9.2437e+03 % p in N/m diff --git a/examples/ishigami/example_ishigami.py b/examples/ishigami/example_ishigami.py index 20a08cca675ab2a49fc74a21c807ee049d9f502f..19aac946ab8ec3c7131f4a39849905e23531ae53 100644 --- a/examples/ishigami/example_ishigami.py +++ b/examples/ishigami/example_ishigami.py @@ -163,7 +163,8 @@ if __name__ == "__main__": MetaModelOpts.ExpDesign = ExpDesign engine = Engine(MetaModelOpts, Model, ExpDesign) engine.start_engine() - engine.train_sequential() + #engine.train_sequential() + engine.train_normal() # Save PCE models with open(f'PCEModel_{Model.name}.pkl', 'wb') as output: diff --git a/src/bayesvalidrox/post_processing/post_processing.py b/src/bayesvalidrox/post_processing/post_processing.py index 1ab736ec99ee1fcf4c2045f211d0c10c48f29198..37ce3d04ba00a719f6e38e6fd89ef675da7851e5 100644 --- a/src/bayesvalidrox/post_processing/post_processing.py +++ b/src/bayesvalidrox/post_processing/post_processing.py @@ -1,27 +1,27 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +""" +Collection of postprocessing functions into a class. +""" -import math import os from itertools import combinations, cycle import numpy as np import pandas as pd from scipy import stats -from sklearn.linear_model import LinearRegression -from sklearn.metrics import mean_squared_error, r2_score +from sklearn.metrics import r2_score import matplotlib.pyplot as plt from matplotlib import ticker from matplotlib.offsetbox import AnchoredText from matplotlib.patches import Patch -# Load the mplstyle -plt.style.use(os.path.join(os.path.split(__file__)[0], - '../', 'bayesvalidrox.mplstyle')) +from bayesvalidrox.surrogate_models.supplementary import root_mean_squared_error + +plt.style.use(os.path.join(os.path.split(__file__)[0], "../", "bayesvalidrox.mplstyle")) class PostProcessing: """ - This class provides many helper functions to post-process the trained - meta-model. + This class provides post-processing functions for the trained metamodels. Parameters ---------- @@ -31,13 +31,54 @@ class PostProcessing: Name of the PostProcessing object to be used for saving the generated files. The default is 'calib'. out_dir : string - Output directory in which the images are placed. The default is ''. + Output directory in which the PostProcessing results are placed. + The results are contained in a subfolder '/Outputs_PostProcessing_name' + The default is ''. + out_format : string + Format of the saved plots. Supports 'png' and 'pdf'. The default is 'pdf'. + + Raises + ------ + AttributeError + `engine` must be trained. """ - def __init__(self, engine, name='calib', out_dir=''): + def __init__(self, engine, name="calib", out_dir="", out_format="pdf"): + # PostProcessing only available for trained engines + if not engine.trained: + raise AttributeError( + "PostProcessing can only be performed on trained engines." + ) + + if not engine.MetaModel: + raise AttributeError( + "PostProcessing can only be performed on engines with a trained MetaModel." + ) + self.engine = engine self.name = name + self.out_format = out_format + self.par_names = self.engine.ExpDesign.par_names + self.x_values = self.engine.ExpDesign.x_values + + self.out_dir = f"./{out_dir}/Outputs_PostProcessing_{self.name}/" + + # Open a pdf for the plots + if not os.path.exists(self.out_dir): + os.makedirs(self.out_dir) + + # Initialize attributes + self.plot_type = "" + self.xlabel = "Time [s]" + self.mc_reference = None + self.sobol = None + self.totalsobol = None + self.valid_error = None + self.rmse = None + self.model_out_dict = {} + self.means = None + self.stds = None self.out_dir = f'./{out_dir}/Outputs_PostProcessing_{self.name}/' @@ -59,17 +100,21 @@ class PostProcessing: self.pce_out_std = None # ------------------------------------------------------------------------- - def plot_moments(self, xlabel: str = 'Time [s]', plot_type: str = None): + def plot_moments(self, plot_type: str = "line"): """ - Plots the moments in a pdf format in the directory + Plots the moments in a user defined output format (standard is pdf) in the directory `Outputs_PostProcessing`. Parameters ---------- - xlabel : str, optional - String to be displayed as x-label. The default is `'Time [s]'`. plot_type : str, optional - Options: bar or line. The default is `None`. + Supports 'bar' for barplots and 'line' + for lineplots The default is `line`. + + Raises + ------ + AttributeError + Plot type must be 'bar' or 'line'. Returns ------- @@ -79,24 +124,18 @@ class PostProcessing: Standard deviation of the model outputs. """ - - bar_plot = True if plot_type == 'bar' else False - meta_model_type = self.engine.MetaModel.meta_model_type + if plot_type not in ["bar", "line"]: + raise AttributeError("The wanted plot-type is not supported.") + bar_plot = bool(plot_type == "bar") meta_model_type = self.engine.MetaModel.meta_model_type # Read Monte-Carlo reference - self.mc_reference = self.engine.Model.read_observation('mc_ref') - self.mc_reference = self.engine.Model.read_observation('mc_ref') - - # Set the x values - x_values_orig = self.engine.ExpDesign.x_values + self.mc_reference = self.engine.Model.read_observation("mc_ref") # Compute the moments with the PCEModel object - self.means, self.stds = self.engine.MetaModel.compute_moments() + self.means, self.stds = self.engine.MetaModel.calculate_moments() - # Plot the best fit line, set the linewidth (lw), color and - # transparency (alpha) of the line - for key in self.engine.out_names: + # Plot the best fit line for key in self.engine.out_names: fig, ax = plt.subplots(nrows=1, ncols=2) @@ -104,67 +143,91 @@ class PostProcessing: mean_data = self.means[key] std_data = self.stds[key] - # Extract a list of x values - if isinstance(x_values_orig, dict): - x = x_values_orig[key] - else: - x = x_values_orig - # Plot: bar plot or line plot if bar_plot: - ax[0].bar(list(map(str, x)), mean_data, color='b', - width=0.25) - ax[1].bar(list(map(str, x)), std_data, color='b', - width=0.25) + ax[0].bar( + list(map(str, self.x_values)), mean_data, color="b", width=0.25 + ) + ax[1].bar( + list(map(str, self.x_values)), std_data, color="b", width=0.25 + ) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: - ax[0].plot(x, mean_data, lw=3, color='k', marker='x', - label=meta_model_type) - ax[1].plot(x, std_data, lw=3, color='k', marker='x', - label=meta_model_type) + ax[0].plot( + self.x_values, + mean_data, + lw=3, + color="k", + marker="x", + label=meta_model_type, + ) + ax[1].plot( + self.x_values, + std_data, + lw=3, + color="k", + marker="x", + label=meta_model_type, + ) if self.mc_reference is not None: if bar_plot: - ax[0].bar(list(map(str, x)), self.mc_reference['mean'], - color='r', width=0.25) - ax[1].bar(list(map(str, x)), self.mc_reference['std'], - color='r', width=0.25) + ax[0].bar( + list(map(str, self.x_values)), + self.mc_reference["mean"], + color="r", + width=0.25, + ) + ax[1].bar( + list(map(str, self.x_values)), + self.mc_reference["std"], + color="r", + width=0.25, + ) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: - ax[0].plot(x, self.mc_reference['mean'], lw=3, marker='x', - color='r', label='Ref.') - ax[1].plot(x, self.mc_reference['std'], lw=3, marker='x', - color='r', label='Ref.') + ax[0].plot( + self.x_values, + self.mc_reference["mean"], + lw=3, + marker="x", + color="r", + label="Ref.", + ) + ax[1].plot( + self.x_values, + self.mc_reference["std"], + lw=3, + marker="x", + color="r", + label="Ref.", + ) # Label the axes and provide a title - ax[0].set_xlabel(xlabel) - ax[1].set_xlabel(xlabel) + ax[0].set_xlabel(self.xlabel) + ax[1].set_xlabel(self.xlabel) ax[0].set_ylabel(key) ax[1].set_ylabel(key) # Provide a title - ax[0].set_title('Mean of ' + key) - ax[1].set_title('Std of ' + key) + ax[0].set_title("Mean of " + key) + ax[1].set_title("Std of " + key) if not bar_plot: - ax[0].legend(loc='best') - ax[1].legend(loc='best') - + ax[0].legend(loc="best") + ax[1].legend(loc="best") plt.tight_layout() - - # save the current figure fig.savefig( - f'{self.out_dir}Mean_Std_PCE_{key}.pdf', - bbox_inches='tight' + f"{self.out_dir}Mean_Std_PCE_{key}.{self.out_format}", + bbox_inches="tight", ) return self.means, self.stds # ------------------------------------------------------------------------- - def valid_metamodel(self, n_samples=1, samples=None, model_out_dict=None, - x_axis='Time [s]') -> None: + def valid_metamodel(self, n_samples=1, samples=None, model_out_dict=None) -> None: """ Evaluates and plots the meta model and the PCEModel outputs for the given number of samples or the given samples. @@ -177,38 +240,38 @@ class PostProcessing: Samples to be evaluated. The default is None. model_out_dict: dict The model runs using the samples provided. - x_axis : str, optional - Label of x axis. The default is `'Time [s]'`. + Returns + ------- + None """ if samples is None: - samples = self._get_sample(n_samples) + samples = self.engine.ExpDesign.generate_samples( + n_samples, sampling_method="random" + ) else: n_samples = samples.shape[0] - # Extract x_values - x_values = self.engine.ExpDesign.x_values + if model_out_dict is None: + model_out_dict, _ = self.engine.Model.run_model_parallel( + samples, key_str="valid" + ) - if model_out_dict is not None: - self.model_out_dict = model_out_dict - else: - self.model_out_dict = self._eval_model(samples, key_str='valid') - self.pce_out_mean, self.pce_out_std = self.engine.eval_metamodel(samples) + out_mean, out_std = self.engine.eval_metamodel(samples) - try: - key = self.engine.out_names[1] - key = self.engine.out_names[1] - except IndexError: - key = self.engine.out_names[0] - key = self.engine.out_names[0] + self._plot_validation_multi(out_mean, out_std, model_out_dict) - n_obs = self.model_out_dict[key].shape[1] + # TODO: should this be kept here? + # Zip the subdirectories + self.engine.Model.zip_subdirs( + f"{self.engine.Model.name}valid", f"{self.engine.Model.name}valid_" + ) - if n_obs == 1: - self._plot_validation(samples) - else: - self._plot_validation_multi(x_values=x_values, x_axis=x_axis) + # Zip the subdirectories + self.engine.Model.zip_subdirs( + f"{self.engine.Model.name}valid", f"{self.engine.Model.name}valid_" + ) # Zip the subdirectories self.engine.Model.zip_subdirs( @@ -228,25 +291,34 @@ class PostProcessing: Parameter sets to be checked. The default is None. outputs : dict, optional Output dictionary with model outputs for all given output types in - `Model.Output.names`. The default is None. + `engine.out_names`. The default is None. Raises ------ AttributeError When neither n_samples nor samples are provided. + Returns + ------- + None + """ # Set the number of samples if samples is None and n_samples is None: - raise AttributeError("Please provide either samples or pass the number" - " of samples!") + raise AttributeError( + "Please provide either samples or pass the number of samples!" + ) n_samples = samples.shape[0] if samples is not None else n_samples # Generate random samples if necessary - samples = self._get_sample(n_samples) if samples is None else samples + if samples is None: + samples = self.engine.ExpDesign.generate_samples( + n_samples, sampling_method="random" + ) # Run the original model with the generated samples - outputs = self._eval_model(samples, key_str='validSet') if outputs is None else outputs + if outputs is None: + outputs, _ = self.engine.Model.run_model_parallel(samples, key_str="valid") # Run the PCE model with the generated samples metamod_outputs, _ = self.engine.eval_metamodel(samples) @@ -254,77 +326,91 @@ class PostProcessing: self.rmse = {} self.valid_error = {} # Loop over the keys and compute RMSE error. - for key in self.engine.out_names: for key in self.engine.out_names: # Root mena square - self.rmse[key] = mean_squared_error(outputs[key], metamod_outputs[key], - squared=False, - multioutput='raw_values') + self.rmse[key] = root_mean_squared_error(outputs[key], metamod_outputs[key]) # Validation error - self.valid_error[key] = (self.rmse[key]**2) / \ - np.var(outputs[key], ddof=1, axis=0) + self.valid_error[key] = (self.rmse[key] ** 2) / np.var( + outputs[key], ddof=1, axis=0 + ) # Print a report table print(f"\n>>>>> Errors of {key} <<<<<") print("\nIndex | RMSE | Validation Error") - print('-'*35) - print('\n'.join(f'{i+1} | {k:.3e} | {j:.3e}' for i, (k, j) - in enumerate(zip(self.rmse[key], - self.valid_error[key])))) - # Save error dicts in PCEModel object - self.engine.MetaModel.rmse = self.rmse - self.engine.MetaModel.valid_error = self.valid_error + print("-" * 35) + print( + "\n".join( + f"{i+1} | {k:.3e} | {j:.3e}" + for i, (k, j) in enumerate( + zip(self.rmse[key], self.valid_error[key]) + ) + ) + ) + # Save error dicts in MetaModel object self.engine.MetaModel.rmse = self.rmse self.engine.MetaModel.valid_error = self.valid_error # ------------------------------------------------------------------------- - def plot_seq_design_diagnostics(self, ref_BME_KLD=None) -> None: + def plot_seq_design_diagnostics(self, ref_bme_kld=None) -> None: """ Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence (KLD) for the sequential design. Parameters ---------- - ref_BME_KLD : array, optional + ref_bme_kld : array, optional Reference BME and KLD . The default is `None`. + Returns + ------- + None + """ engine = self.engine n_init_samples = engine.ExpDesign.n_init_samples n_total_samples = engine.ExpDesign.X.shape[0] - # TODO: move this one! - newpath = f'Outputs_PostProcessing_{self.name}/seq_design_diagnostics/' + newpath = f"{self.out_dir}/seq_design_diagnostics/" if not os.path.exists(newpath): os.makedirs(newpath) - plotList = ['Modified LOO error', 'Validation error', 'KLD', 'BME', - 'RMSEMean', 'RMSEStd', 'Hellinger distance'] - seqList = [engine.SeqModifiedLOO, engine.seqValidError, - engine.SeqKLD, engine.SeqBME, engine.seqRMSEMean, - engine.seqRMSEStd, engine.SeqDistHellinger] - - markers = ('x', 'o', 'd', '*', '+') - colors = ('k', 'darkgreen', 'b', 'navy', 'darkred') + plot_list = [ + "Modified LOO error", + "Validation error", + "KLD", + "BME", + "RMSEMean", + "RMSEStd", + "Hellinger distance", + ] + seq_list = [ + engine.SeqModifiedLOO, + engine.seqValidError, + engine.SeqKLD, + engine.SeqBME, + engine.seqRMSEMean, + engine.seqRMSEStd, + engine.SeqDistHellinger, + ] + + markers = ("x", "o", "d", "*", "+") + colors = ("k", "darkgreen", "b", "navy", "darkred") # Plot the evolution of the diagnostic criteria of the # Sequential Experimental Design. - for plotidx, plot in enumerate(plotList): - fig, ax = plt.subplots() - seq_dict = seqList[plotidx] - name_util = list(seq_dict.keys()) + for plotidx, plot in enumerate(plot_list): + fig, ax = plt.subplots() + seq_dict = seq_list[plotidx] + name_util = list(seq_dict.keys()) # TODO: same as engine.out_names? if len(name_util) == 0: continue # Box plot when Replications have been detected. if any(int(name.split("rep_", 1)[1]) > 1 for name in name_util): - # Extract the values from dict sorted_seq_opt = {} - # Number of replications n_reps = engine.ExpDesign.n_replication - # Get the list of utility function names # Handle if only one UtilityFunction is provided if not isinstance(engine.ExpDesign.util_func, list): util_funcs = [engine.ExpDesign.util_func] @@ -332,30 +418,42 @@ class PostProcessing: util_funcs = engine.ExpDesign.util_func for util in util_funcs: - sortedSeq = {} - # min number of runs available from reps - n_runs = min([seq_dict[f'{util}_rep_{i+1}'].shape[0] - for i in range(n_reps)]) + sorted_seq = {} + n_runs = min( + seq_dict[f"{util}_rep_{i + 1}"].shape[0] for i in range(n_reps) + ) - for runIdx in range(n_runs): + for run_idx in range(n_runs): values = [] for key in seq_dict.keys(): if util in key: - values.append(seq_dict[key][runIdx].mean()) - sortedSeq['SeqItr_'+str(runIdx)] = np.array(values) - sorted_seq_opt[util] = sortedSeq + values.append(seq_dict[key][run_idx].mean()) + sorted_seq["SeqItr_" + str(run_idx)] = np.array(values) + sorted_seq_opt[util] = sorted_seq # BoxPlot def draw_plot(data, labels, edge_color, fill_color, idx): - pos = labels - (idx-1) - bp = plt.boxplot(data, positions=pos, labels=labels, - patch_artist=True, sym='', widths=0.75) - elements = ['boxes', 'whiskers', 'fliers', 'means', - 'medians', 'caps'] + pos = labels - (idx - 1) + bp = plt.boxplot( + data, + positions=pos, + labels=labels, + patch_artist=True, + sym="", + widths=0.75, + ) + elements = [ + "boxes", + "whiskers", + "fliers", + "means", + "medians", + "caps", + ] for element in elements: plt.setp(bp[element], color=edge_color[idx]) - for patch in bp['boxes']: + for patch in bp["boxes"]: patch.set(facecolor=fill_color[idx]) if engine.ExpDesign.n_new_samples != 1: @@ -364,8 +462,8 @@ class PostProcessing: else: step1 = 5 step2 = 5 - edge_color = ['red', 'blue', 'green'] - fill_color = ['tan', 'cyan', 'lightgreen'] + edge_color = ["red", "blue", "green"] + fill_color = ["tan", "cyan", "lightgreen"] plot_label = plot # Plot for different Utility Functions for idx, util in enumerate(util_funcs): @@ -376,65 +474,71 @@ class PostProcessing: all_errors = np.hstack((all_errors, errors)) # Special cases for BME and KLD - if plot == 'KLD' or plot == 'BME': + if plot in ["KLD", "BME"]: # BME convergence if refBME is provided - if ref_BME_KLD is not None: - if plot == 'BME': - refValue = ref_BME_KLD[0] - plot_label = r'BME/BME$^{Ref.}$' - if plot == 'KLD': - refValue = ref_BME_KLD[1] - plot_label = '$D_{KL}[p(\\theta|y_*),p(\\theta)]'\ - ' / D_{KL}^{Ref.}[p(\\theta|y_*), '\ - 'p(\\theta)]$' + if ref_bme_kld is not None: + ref_value = None + if plot == "BME": + ref_value = ref_bme_kld[0] + plot_label = r"BME/BME$^{Ref.}$" + if plot == "KLD": + ref_value = ref_bme_kld[1] + plot_label = ( + "$D_{KL}[p(\\theta|y_*),p(\\theta)]" + " / D_{KL}^{Ref.}[p(\\theta|y_*), " + "p(\\theta)]$" + ) # Difference between BME/KLD and the ref. values - all_errors = np.divide(all_errors, - np.full((all_errors.shape), - refValue)) + all_errors = np.divide( + all_errors, np.full((all_errors.shape), ref_value) + ) # Plot baseline for zero, i.e. no difference - plt.axhline(y=1.0, xmin=0, xmax=1, c='green', - ls='--', lw=2) + plt.axhline(y=1.0, xmin=0, xmax=1, c="green", ls="--", lw=2) # Plot each UtilFuncs - labels = np.arange( - n_init_samples, n_total_samples+1, step1) - draw_plot(all_errors[:, ::step2], labels, edge_color, - fill_color, idx) + labels = np.arange(n_init_samples, n_total_samples + 1, step1) + draw_plot( + all_errors[:, ::step2], labels, edge_color, fill_color, idx + ) plt.xticks(labels, labels) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) - ax.xaxis.grid(True, which='major', linestyle='-') - ax.xaxis.grid(True, which='minor', linestyle='--') + ax.xaxis.grid(True, which="major", linestyle="-") + ax.xaxis.grid(True, which="minor", linestyle="--") # Legend legend_elements = [] for idx, util in enumerate(util_funcs): - legend_elements.append(Patch(facecolor=fill_color[idx], - edgecolor=edge_color[idx], - label=util)) - plt.legend(handles=legend_elements[::-1], loc='best') - - if plot not in ['BME','KLD']: - plt.yscale('log') + legend_elements.append( + Patch( + facecolor=fill_color[idx], + edgecolor=edge_color[idx], + label=util, + ) + ) + plt.legend(handles=legend_elements[::-1], loc="best") + + if plot not in ["BME", "KLD"]: + plt.yscale("log") plt.autoscale(True) - plt.xlabel('\\# of training samples') + plt.xlabel("\\# of training samples") plt.ylabel(plot_label) plt.title(plot) # save the current figure - plot_name = plot.replace(' ', '_') + plot_name = plot.replace(" ", "_") fig.savefig( - f'./{newpath}/seq_{plot_name}.pdf', - bbox_inches='tight' + f"./{newpath}/seq_{plot_name}.{self.out_format}", + bbox_inches="tight", ) # Destroy the current plot plt.close() # Save arrays into files - f = open(f'./{newpath}/seq_{plot_name}.txt', 'w') + f = open(f"./{newpath}/seq_{plot_name}.txt", "w") f.write(str(sorted_seq_opt)) f.close() else: @@ -444,45 +548,58 @@ class PostProcessing: step = engine.ExpDesign.n_new_samples else: step = 1 - x_idx = np.arange(n_init_samples, n_total_samples+1, step) + x_idx = np.arange(n_init_samples, n_total_samples + 1, step) if n_total_samples not in x_idx: x_idx = np.hstack((x_idx, n_total_samples)) - if plot in ['KLD', 'BME']: + if plot in ["KLD", "BME"]: # BME convergence if refBME is provided - if ref_BME_KLD is not None: - if plot == 'BME': - refValue = ref_BME_KLD[0] - plot_label = r'BME/BME$^{Ref.}$' - if plot == 'KLD': - refValue = ref_BME_KLD[1] - plot_label = '$D_{KL}[p(\\theta|y_*),p(\\theta)]'\ - ' / D_{KL}^{Ref.}[p(\\theta|y_*), '\ - 'p(\\theta)]$' + if ref_bme_kld is not None: + if plot == "BME": + ref_value = ref_bme_kld[0] + plot_label = r"BME/BME$^{Ref.}$" + if plot == "KLD": + ref_value = ref_bme_kld[1] + plot_label = ( + "$D_{KL}[p(\\theta|y_*),p(\\theta)]" + " / D_{KL}^{Ref.}[p(\\theta|y_*), " + "p(\\theta)]$" + ) # Difference between BME/KLD and the ref. values - values = np.divide(seq_values, - np.full((seq_values.shape), - refValue)) + values = np.divide( + seq_values, np.full((seq_values.shape), ref_value) + ) # Plot baseline for zero, i.e. no difference - plt.axhline(y=1.0, xmin=0, xmax=1, c='green', - ls='--', lw=2) + plt.axhline(y=1.0, xmin=0, xmax=1, c="green", ls="--", lw=2) # Set the limits plt.ylim([1e-1, 1e1]) # Create the plots - plt.semilogy(x_idx, values, marker=markers[idx], - color=colors[idx], ls='--', lw=2, - label=name.split("_rep", 1)[0]) + plt.semilogy( + x_idx, + values, + marker=markers[idx], + color=colors[idx], + ls="--", + lw=2, + label=name.split("_rep", 1)[0], + ) else: plot_label = plot # Create the plots - plt.plot(x_idx, seq_values, marker=markers[idx], - color=colors[idx], ls='--', lw=2, - label=name.split("_rep", 1)[0]) + plt.plot( + x_idx, + seq_values, + marker=markers[idx], + color=colors[idx], + ls="--", + lw=2, + label=name.split("_rep", 1)[0], + ) else: plot_label = plot @@ -491,346 +608,240 @@ class PostProcessing: # Plot the error evolution for each output # print(x_idx.shape) # print(seq_values.mean(axis=1).shape) - plt.semilogy(x_idx, seq_values.mean(axis=1), - marker=markers[idx], ls='--', lw=2, - color=colors[idx], - label=name.split("_rep", 1)[0]) + plt.semilogy( + x_idx, + seq_values.mean(axis=1), + marker=markers[idx], + ls="--", + lw=2, + color=colors[idx], + label=name.split("_rep", 1)[0], + ) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) - ax.xaxis.grid(True, which='major', linestyle='-') - ax.xaxis.grid(True, which='minor', linestyle='--') - - ax.tick_params(axis='both', which='major', direction='in', - width=3, length=10) - ax.tick_params(axis='both', which='minor', direction='in', - width=2, length=8) - plt.xlabel('Number of runs') + ax.xaxis.grid(True, which="major", linestyle="-") + ax.xaxis.grid(True, which="minor", linestyle="--") + + ax.tick_params( + axis="both", which="major", direction="in", width=3, length=10 + ) + ax.tick_params( + axis="both", which="minor", direction="in", width=2, length=8 + ) + plt.xlabel("Number of runs") plt.ylabel(plot_label) plt.title(plot) plt.legend(frameon=True) # save the current figure - plot_name = plot.replace(' ', '_') + plot_name = plot.replace(" ", "_") fig.savefig( - f'./{newpath}/seq_{plot_name}.pdf', - bbox_inches='tight' + f"./{newpath}/seq_{plot_name}.{self.out_format}", + bbox_inches="tight", ) # Destroy the current plot plt.close() # ---------------- Saving arrays into files --------------- - np.save(f'./{newpath}/seq_{plot_name}.npy', seq_values) - + np.save(f"./{newpath}/seq_{plot_name}.npy", seq_values) # ------------------------------------------------------------------------- - def sobol_indices(self, xlabel: str = 'Time [s]', plot_type: str = None): + def sobol_indices(self, plot_type: str = "line", save: bool = True): """ Visualizes and writes out Sobol' and Total Sobol' indices of the trained metamodel. One file is created for each index and output key. Parameters ---------- - xlabel : str, optional - Label of the x-axis. The default is `'Time [s]'`. plot_type : str, optional - Plot type. The default is `None`. This corresponds to line plot. + Plot type, supports 'line' for lineplots and 'bar' for barplots. + The default is `line`. Bar chart can be selected by `bar`. + save : bool, optional + Write out the inidces as csv files if set to True. The default + is True. Raises ------ AttributeError MetaModel in given Engine needs to be of type 'pce' or 'apce'. + AttributeError + Plot-type must be 'line' or 'bar'. + + Returns + ------- + sobol_all : dict + All possible Sobol' indices for the given metamodel. + total_sobol_all : dict + All Total Sobol' indices for the given metamodel. """ # This function currently only supports PCE/aPCE metamod = self.engine.MetaModel - if not hasattr(metamod, 'meta_model_type'): - raise AttributeError('Sobol indices only support PCE-type models!') - if metamod.meta_model_type.lower() not in ['pce', 'apce']: - raise AttributeError('Sobol indices only support PCE-type models!') + if not hasattr(metamod, "meta_model_type"): + raise AttributeError("Sobol indices only support PCE-type models!") + if metamod.meta_model_type.lower() not in ["pce", "apce"]: + raise AttributeError("Sobol indices only support PCE-type models!") + + if plot_type not in ["line", "bar"]: + raise AttributeError("The wanted plot type is not supported.") # Extract the necessary variables - basis_dict = metamod._basis_dict - coeffs_dict = metamod._coeffs_dict - n_params = metamod.ndim max_order = np.max(metamod._pce_deg) - sobol_cell_b = {} - total_sobol_b = {} - cov_Z_p_q = np.zeros((n_params)) - - for b_i in range(metamod.n_bootstrap_itrs): - - sobol_cell_, total_sobol_ = {}, {} - - for output in self.engine.out_names: - for output in self.engine.out_names: - - n_meas_points = len(coeffs_dict[f'b_{b_i+1}'][output]) - - # Initialize the (cell) array containing the (total) Sobol indices. - sobol_array = dict.fromkeys(range(1, max_order+1), []) - sobol_cell_array = dict.fromkeys(range(1, max_order+1), []) - - for i_order in range(1, max_order+1): - n_comb = math.comb(n_params, i_order) - - sobol_cell_array[i_order] = np.zeros( - (n_comb, n_meas_points)) - - total_sobol_array = np.zeros((n_params, n_meas_points)) - - # Initialize the cell to store the names of the variables - TotalVariance = np.zeros((n_meas_points)) - # Loop over all measurement points and calculate sobol indices - for pIdx in range(n_meas_points): - - # Extract the basis indices (alpha) and coefficients - Basis = basis_dict[f'b_{b_i+1}'][output][f'y_{pIdx+1}'] - - try: - clf_poly = metamod._clf_poly[f'b_{b_i+1}'][output][f'y_{pIdx+1}'] - PCECoeffs = clf_poly.coef_ - except: - PCECoeffs = coeffs_dict[f'b_{b_i+1}'][output][f'y_{pIdx+1}'] - - # Compute total variance - TotalVariance[pIdx] = np.sum(np.square(PCECoeffs[1:])) - - nzidx = np.where(PCECoeffs != 0)[0] - # Set all the Sobol indices equal to zero in the presence of a - # null output. - if len(nzidx) == 0: - # This is buggy. - for i_order in range(1, max_order+1): - sobol_cell_array[i_order][:, pIdx] = 0 + outputs = self.engine.out_names + if metamod.sobol is None: + metamod.calculate_sobol() + sobol_all, total_sobol_all = metamod.sobol, metamod.total_sobol + self.sobol, self.totalsobol = sobol_all, total_sobol_all + + # TODO: move this to the PCE class? + # Save indices + if save: + for _, output in enumerate(outputs): + total_sobol = total_sobol_all[output] + np.savetxt( + f"{self.out_dir}totalsobol_" + output.replace("/", "_") + ".csv", + total_sobol.T, + delimiter=",", + header=",".join(self.par_names), + comments="", + ) - # Otherwise compute them by summing well-chosen coefficients - else: - nz_basis = Basis[nzidx] - for i_order in range(1, max_order+1): - idx = np.where( - np.sum(nz_basis > 0, axis=1) == i_order) - subbasis = nz_basis[idx] - Z = np.array( - list(combinations(range(n_params), i_order))) - - for q in range(Z.shape[0]): - Zq = Z[q] - subsubbasis = subbasis[:, Zq] - subidx = np.prod(subsubbasis, axis=1) > 0 - sum_ind = nzidx[idx[0][subidx]] - if TotalVariance[pIdx] == 0.0: - sobol_cell_array[i_order][q, pIdx] = 0.0 - else: - sobol = np.sum( - np.square(PCECoeffs[sum_ind])) - sobol /= TotalVariance[pIdx] - sobol_cell_array[i_order][q, pIdx] = sobol - - # Compute the TOTAL Sobol indices. - for ParIdx in range(n_params): - idx = nz_basis[:, ParIdx] > 0 - sum_ind = nzidx[idx] - - if TotalVariance[pIdx] == 0.0: - total_sobol_array[ParIdx, pIdx] = 0.0 - else: - sobol = np.sum(np.square(PCECoeffs[sum_ind])) - sobol /= TotalVariance[pIdx] - total_sobol_array[ParIdx, pIdx] = sobol - - # ----- if PCA selected: Compute covariance ----- - if metamod.dim_red_method.lower() == 'pca': - # Extract the basis indices (alpha) and coefficients for - # next component - if pIdx < n_meas_points-1: - nextBasis = basis_dict[f'b_{b_i+1}'][output][f'y_{pIdx+2}'] - if metamod.bootstrap_method != 'fast' or b_i == 0: - clf_poly = metamod._clf_poly[f'b_{b_i+1}'][output][f'y_{pIdx+2}'] - nextPCECoeffs = clf_poly.coef_ - else: - nextPCECoeffs = coeffs_dict[f'b_{b_i+1}'][output][f'y_{pIdx+2}'] - - # Choose the common non-zero basis - mask = (Basis[:, None] == - nextBasis).all(-1).any(-1) - n_mask = (nextBasis[:, None] == - Basis).all(-1).any(-1) - - # Compute the covariance in Eq 17. - for ParIdx in range(n_params): - idx = (mask) & (Basis[:, ParIdx] > 0) - n_idx = (n_mask) & (nextBasis[:, ParIdx] > 0) - try: - cov_Z_p_q[ParIdx] += np.sum(np.dot( - PCECoeffs[idx], nextPCECoeffs[n_idx]) - ) - except: - pass - - # Compute the sobol indices according to Ref. 2 - if metamod.dim_red_method.lower() == 'pca': - n_c_points = self.engine.ExpDesign.Y[output].shape[1] - PCA = metamod.pca[f'b_{b_i+1}'][output] - compPCA = PCA.components_ - nComp = compPCA.shape[0] - var_Z_p = PCA.explained_variance_ - - # Extract the sobol index of the components - for i_order in range(1, max_order+1): - n_comb = math.comb(n_params, i_order) - sobol_array[i_order] = np.zeros((n_comb, n_c_points)) - Z = np.array( - list(combinations(range(n_params), i_order))) - - # Loop over parameters - for q in range(Z.shape[0]): - S_Z_i = sobol_cell_array[i_order][q] - - for tIdx in range(n_c_points): - var_Y_t = np.var( - self.engine.ExpDesign.Y[output][:, tIdx]) - if var_Y_t == 0.0: - term1, term2 = 0.0, 0.0 - else: - # Eq. 17 - term1 = 0.0 - for i in range(nComp): - a = S_Z_i[i] * var_Z_p[i] - a *= compPCA[i, tIdx]**2 - term1 += a - - # TODO: Term 2 - # term2 = 0.0 - # for i in range(nComp-1): - # term2 += cov_Z_p_q[q] * compPCA[i, tIdx] - # term2 *= compPCA[i+1, tIdx] - # term2 *= 2 - - sobol_array[i_order][q, - tIdx] = term1 # + term2 - - # Devide over total output variance Eq. 18 - sobol_array[i_order][q, tIdx] /= var_Y_t - - # Compute the TOTAL Sobol indices. - total_sobol = np.zeros((n_params, n_c_points)) - for ParIdx in range(n_params): - S_Z_i = total_sobol_array[ParIdx] - - for tIdx in range(n_c_points): - var_Y_t = np.var( - self.engine.ExpDesign.Y[output][:, tIdx]) - if var_Y_t == 0.0: - term1, term2 = 0.0, 0.0 - else: - term1 = 0 - for i in range(nComp): - term1 += S_Z_i[i] * var_Z_p[i] * \ - (compPCA[i, tIdx]**2) - - # Term 2 - term2 = 0 - for i in range(nComp-1): - term2 += cov_Z_p_q[ParIdx] * compPCA[i, tIdx] \ - * compPCA[i+1, tIdx] - term2 *= 2 - - total_sobol[ParIdx, tIdx] = term1 # + term2 - - # Devide over total output variance Eq. 18 - total_sobol[ParIdx, tIdx] /= var_Y_t - - sobol_cell_[output] = sobol_array - total_sobol_[output] = total_sobol - else: - sobol_cell_[output] = sobol_cell_array - total_sobol_[output] = total_sobol_array - - # Save for each bootsrtap iteration - sobol_cell_b[b_i] = sobol_cell_ - total_sobol_b[b_i] = total_sobol_ - - # Average total sobol indices - total_sobol_all = {} - for i in sorted(total_sobol_b): - for k, v in total_sobol_b[i].items(): - if k not in total_sobol_all: - total_sobol_all[k] = [None] * len(total_sobol_b) - total_sobol_all[k][i] = v - - self.total_sobol = {} - for output in self.engine.out_names: - for output in self.engine.out_names: - self.total_sobol[output] = np.mean(total_sobol_all[output], axis=0) - - # ---------------- Plot ----------------------- - par_names = self.engine.ExpDesign.par_names - x_values_orig = self.engine.ExpDesign.x_values + for i_order in range(1, max_order + 1): + sobol = sobol_all[i_order][output][0] + np.savetxt( + f"{self.out_dir}sobol_{i_order}_" + + output.replace("/", "_") + + ".csv", + sobol.T, + delimiter=",", + header=",".join(self.par_names), + comments="", + ) + + # Plot Sobol' indices + self.plot_type = plot_type + for i_order in range(1, max_order + 1): + par_names_i = ( + list(combinations(self.par_names, i_order)) + if (i_order != 1) + else self.par_names + ) + self.plot_sobol(par_names_i, outputs, sobol_type="sobol", i_order=i_order) + self.plot_sobol(self.par_names, outputs, sobol_type="totalsobol") + + return sobol_all, total_sobol_all + + def plot_sobol( + self, + par_names: list, + outputs: list, + sobol_type: str = "sobol", + i_order: int = 0, + ) -> None: + """ + Generate plots for each output in the given set of Sobol' indices. - fig = plt.figure() + Parameters + ---------- + par_names : list + Parameter names for each Sobol' index. + outputs : list + Output names to be plotted. + sobol_type : string, optional + Type of Sobol' indices to visualize. Can be either + 'sobol' or 'totalsobol'. The default is 'sobol'. + i_order : int, optional + Order of Sobol' index that should be plotted. + This parameter is only applied for sobol_type = 'sobol'. + The default is 0. - for outIdx, output in enumerate(self.engine.out_names): - for outIdx, output in enumerate(self.engine.out_names): + Returns + ------- + None - # Extract total Sobol indices - total_sobol = self.total_sobol[output] + """ + sobol = None + if sobol_type == "sobol": + sobol = self.sobol[i_order] + if sobol_type == "totalsobol": + sobol = self.totalsobol + + for _, output in enumerate(outputs): + x = ( + self.x_values[output] + if isinstance(self.x_values, dict) + else self.x_values + ) + sobol_ = sobol[output] + if sobol_type == "sobol": + sobol_ = sobol_[0] # Compute quantiles - q_5 = np.quantile(total_sobol_all[output], q=0.05, axis=0) - q_97_5 = np.quantile(total_sobol_all[output], q=0.975, axis=0) - - # Extract a list of x values - if type(x_values_orig) is dict: - x = x_values_orig[output] - else: - x = x_values_orig + q_5 = np.quantile(sobol[output], q=0.05, axis=0) + q_97_5 = np.quantile(sobol[output], q=0.975, axis=0) - if plot_type == 'bar': + if self.plot_type == "bar": + fig = plt.figure() ax = fig.add_axes([0, 0, 1, 1]) - dict1 = {xlabel: x} - dict2 = {param: sobolIndices for param, sobolIndices - in zip(par_names, total_sobol)} + dict1 = {self.xlabel: x} + dict2 = dict(zip(par_names, sobol_)) df = pd.DataFrame({**dict1, **dict2}) - df.plot(x=xlabel, y=par_names, kind="bar", ax=ax, rot=0, - colormap='Dark2', yerr=q_97_5-q_5) - ax.set_ylabel('Total Sobol indices, $S^T$') + df.plot( + x=self.xlabel, + y=par_names, + kind="bar", + ax=ax, + rot=0, + colormap="Dark2", + yerr=q_97_5 - q_5, + ) + if sobol_type == "sobol": + ax.set_ylabel("Sobol indices, $S^T$") + elif sobol_type == "totalsobol": + ax.set_ylabel("Total Sobol indices, $S^T$") else: - for i, sobolIndices in enumerate(total_sobol): - plt.plot(x, sobolIndices, label=par_names[i], - marker='x', lw=2.5) + fig = plt.figure() + ax = fig.add_axes([0, 0, 1, 1]) + for i, sobol_indices in enumerate(sobol_): + plt.plot( + x, + sobol_indices, + label=par_names[i], + marker="x", + lw=2.5, + ) plt.fill_between(x, q_5[i], q_97_5[i], alpha=0.15) - plt.ylabel('Total Sobol indices, $S^T$') - plt.xlabel(xlabel) - - plt.title(f'Sensitivity analysis of {output}') - if plot_type != 'bar': - plt.legend(loc='best', frameon=True) - - # Save indices - np.savetxt(f'./{self.out_dir}totalsobol_' + - output.replace('/', '_') + '.csv', - total_sobol.T, delimiter=',', - header=','.join(par_names), comments='') + if sobol_type == "sobol": + ax.set_ylabel("Sobol indices, $S^T$") + elif sobol_type == "totalsobol": + ax.set_ylabel("Total Sobol indices, $S^T$") + plt.xlabel(self.xlabel) + plt.legend(loc="best", frameon=True) - # save the current figure - fig.savefig( - f'./{self.out_dir}Sobol_indices_{output}.pdf', - bbox_inches='tight' - ) - - # Destroy the current plot - plt.close() - - return self.total_sobol + if sobol_type == "sobol": + plt.title(f"{i_order} order Sobol' indices of {output}") + fig.savefig( + f"{self.out_dir}Sobol_indices_{i_order}_{output}.{self.out_format}", + bbox_inches="tight", + ) + elif sobol_type == "totalsobol": + plt.title(f"Total Sobol' indices of {output}") + fig.savefig( + f"{self.out_dir}TotalSobol_indices_{output}.{self.out_format}", + bbox_inches="tight", + ) + plt.clf() # ------------------------------------------------------------------------- - def check_reg_quality(self, n_samples: int = 1000, samples=None, outputs: dict = None) -> None: + def check_reg_quality( + self, n_samples: int = 1000, samples=None, outputs: dict = None + ) -> None: """ Checks the quality of the metamodel for single output models based on: https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685 @@ -843,15 +854,17 @@ class PostProcessing: Parameter sets to use for the check. The default is None. outputs : dict, optional Output dictionary with model outputs for all given output types in - `Model.Output.names`. The default is None. + `engine.out_names`. The default is None. - Return - ------ + Returns + ------- None """ if samples is None: - samples = self._get_sample(n_samples) + samples = self.engine.ExpDesign.generate_samples( + n_samples, sampling_method="random" + ) else: n_samples = samples.shape[0] @@ -867,22 +880,24 @@ class PostProcessing: # ------ Residuals vs. predicting variables ------ # Check the assumptions of linearity and independence - fig1 = plt.figure() for i, par in enumerate(self.engine.ExpDesign.par_names): + plt.scatter(x=samples[:, i], y=residuals, color="blue", edgecolor="k") plt.title(f"{key}: Residuals vs. {par}") - plt.scatter( - x=samples[:, i], y=residuals, color='blue', edgecolor='k') plt.grid(True) - xmin, xmax = min(samples[:, i]), max(samples[:, i]) - plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', - lw=3, linestyle='--') + plt.hlines( + y=0, + xmin=min(samples[:, i]) * 0.9, + xmax=max(samples[:, i]) * 1.1, + color="red", + lw=3, + linestyle="--", + ) plt.xlabel(par) - plt.ylabel('Residuals') - - # save the current figure - fig1.savefig(f'./{self.out_dir}/Residuals_vs_Par_{i+1}.pdf', - bbox_inches='tight') - # Destroy the current plot + plt.ylabel("Residuals") + plt.savefig( + f"./{self.out_dir}/Residuals_vs_Par_{i+1}.{self.out_format}", + bbox_inches="tight", + ) plt.close() # ------ Fitted vs. residuals ------ @@ -890,44 +905,46 @@ class PostProcessing: for i in range(y_metamod_val[key].shape[0]): plt.scatter(x=y_metamod_val[key][i,:], y=residuals[i,:], color="blue", edgecolor="k") plt.title(f"{key}: Residuals vs. fitted values") - plt.scatter(x=y_pce_val_, y=residuals, color='blue', edgecolor='k') plt.grid(True) - xmin, xmax = min(y_val_), max(y_val_) - plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3, - linestyle='--') + plt.hlines( + y=0, + xmin=min(y_val[key]) * 0.9, + xmax=max(y_val[key]) * 1.1, + color="red", + lw=3, + linestyle="--", + ) plt.xlabel(key) - plt.ylabel('Residuals') - - # save the current figure - fig2.savefig(f'./{self.out_dir}/Fitted_vs_Residuals.pdf', - bbox_inches='tight') - # Destroy the current plot + plt.ylabel("Residuals") + plt.savefig( + f"./{self.out_dir}/Fitted_vs_Residuals.{self.out_format}", + bbox_inches="tight", + ) plt.close() # ------ Histogram of normalized residuals ------ - fig3 = plt.figure() - resid_pearson = residuals / (max(residuals)-min(residuals)) - plt.hist(resid_pearson, bins=20, edgecolor='k') - plt.ylabel('Count') - plt.xlabel('Normalized residuals') + resid_pearson = residuals / (max(residuals) - min(residuals)) + plt.hist(resid_pearson, bins=20, edgecolor="k") + plt.ylabel("Count") + plt.xlabel("Normalized residuals") plt.title(f"{key}: Histogram of normalized residuals") # Normality (Shapiro-Wilk) test of the residuals ax = plt.gca() _, p = stats.shapiro(residuals) if p < 0.01: - annText = "The residuals seem to come from a Gaussian Process." + ann_text = "The residuals seem to come from a Gaussian Process." else: - annText = "The normality assumption may not hold." - at = AnchoredText(annText, prop=dict(size=30), frameon=True, - loc='upper left') + ann_text = "The normality assumption may not hold." + at = AnchoredText( + ann_text, prop={"size": 30}, frameon=True, loc="upper left" + ) at.patch.set_boxstyle("round,pad=0.,rounding_size=0.2") ax.add_artist(at) - - # save the current figure - fig3.savefig(f'./{self.out_dir}/Hist_NormResiduals.pdf', - bbox_inches='tight') - # Destroy the current plot + plt.savefig( + f"./{self.out_dir}/Hist_NormResiduals.{self.out_format}", + bbox_inches="tight", + ) plt.close() # ------ Q-Q plot of the normalized residuals ------ @@ -939,17 +956,16 @@ class PostProcessing: plt.ylabel("Sample quantiles") plt.title(f"{key}: Q-Q plot of normalized residuals") plt.grid(True) - - # save the current figure - plt.savefig(f'./{self.out_dir}/QQPlot_NormResiduals.pdf', - bbox_inches='tight') - # Destroy the current plot + plt.savefig( + f"./{self.out_dir}/QQPlot_NormResiduals.{self.out_format}", + bbox_inches="tight", + ) plt.close() # ------------------------------------------------------------------------- def plot_metamodel_3d(self, n_samples=10): """ - Visualize the results of a PCE MetaModel as a 3D surface over two input + Visualize the results of a MetaModel as a 3D surface over two input parameters. Parameters @@ -964,167 +980,58 @@ class PostProcessing: Returns ------- - None. + None """ if self.engine.ExpDesign.ndim != 2: raise AttributeError( - 'This function is only applicable if the MetaModel input dimension is 2.') + "This function is only applicable if the MetaModel input dimension is 2." + ) samples = self.engine.ExpDesign.generate_samples(n_samples) samples = np.sort(np.sort(samples, axis=1), axis=0) - mean, stdev = self.engine.eval_metamodel(samples=samples) + mean, _ = self.engine.eval_metamodel(samples=samples) - if self.engine.emulator: - title = 'MetaModel' - else: - title = 'Model' - X, Y = np.meshgrid(samples[:, 0], samples[:, 1]) + title = "MetaModel" if self.engine.emulator else "Model" + x, y = np.meshgrid(samples[:, 0], samples[:, 1]) for name in self.engine.out_names: for t in range(mean[name].shape[1]): fig = plt.figure() - ax = plt.axes(projection='3d') - ax.plot_surface(X, Y, np.atleast_2d(mean[name][:, t]), rstride=1, cstride=1, - cmap='viridis', edgecolor='none') + ax = plt.axes(projection="3d") + ax.plot_surface( + x, + y, + np.atleast_2d(mean[name][:, t]), + rstride=1, + cstride=1, + cmap="viridis", + edgecolor="none", + ) ax.set_title(title) - ax.set_xlabel('$x_1$') - ax.set_ylabel('$x_2$') - ax.set_zlabel('$f(x_1,x_2)$') + ax.set_xlabel("$x_1$") + ax.set_ylabel("$x_2$") + ax.set_zlabel("$f(x_1,x_2)$") plt.grid() - - # save the figure to file - fig.savefig(f'./{self.out_dir}/3DPlot_{title}_{name}{t}.pdf', - bbox_inches='tight') + fig.savefig( + f"./{self.out_dir}/3DPlot_{title}_{name}{t}.{self.out_format}", + bbox_inches="tight", + ) plt.close(fig) # ------------------------------------------------------------------------- - - def _get_sample(self, n_samples): - """ - Generates random samples taken from the input parameter space. - - Parameters - ---------- - n_samples : int - Number of samples to generate. - - Returns - ------- - samples : array of shape (n_samples, n_params) - Generated samples. - - """ - samples = self.engine.ExpDesign.generate_samples( - n_samples, - sampling_method='random') - return samples - - # ------------------------------------------------------------------------- - def _eval_model(self, samples, key_str='Valid'): - """ - Evaluates Forward Model on the given samples. - - Parameters - ---------- - samples : array of shape (n_samples, n_params) - Samples to evaluate the model at. - key_str : str, optional - Key string pass to the model. The default is 'Valid'. - - Returns - ------- - model_outs : dict - Dictionary of results. - - """ - samples = self._get_sample() - model_outs, _ = self.engine.Model.run_model_parallel( - samples, key_str=key_str) - - return model_outs - - # ------------------------------------------------------------------------- - def _plot_validation(self, samples): - """ - Plots outputs for visual comparison of metamodel outputs with that of - the (full) original model. - - Returns - ------- - None. - - """ - # This function currently only supports PCE/aPCE - metamod = self.engine.MetaModel - if not hasattr(metamod, 'meta_model_type'): - raise AttributeError( - 'This evaluation only support PCE-type models!') - if metamod.meta_model_type.lower() not in ['pce', 'apce']: - raise AttributeError( - 'This evaluation only support PCE-type models!') - - # get the samples - y_pce_val = self.pce_out_mean - y_val = self.model_out_dict - - fig = plt.figure() - # Fit the data(train the model) - for key in y_pce_val.keys(): - - y_pce_val_ = y_pce_val[key] - y_val_ = y_val[key] - - regression_model = LinearRegression() - regression_model.fit(y_pce_val_, y_val_) - - # Predict - x_new = np.linspace(np.min(y_pce_val_), np.max(y_val_), 100) - y_predicted = regression_model.predict(x_new[:, np.newaxis]) - - plt.scatter(y_pce_val_, y_val_, color='gold', linewidth=2) - plt.plot(x_new, y_predicted, color='k') - - # Calculate the adjusted R_squared and RMSE - # the total number of explanatory variables in the model - # (not including the constant term) - length_list = [] - for key, value in metamod._coeffs_dict['b_1'][key].items(): - length_list.append(len(value)) - n_predictors = min(length_list) - n_samples = x_val.shape[0] - - R2 = r2_score(y_pce_val_, y_val_) - AdjR2 = 1 - (1 - R2) * (n_samples - 1) / \ - (n_samples - n_predictors - 1) - rmse = mean_squared_error(y_pce_val_, y_val_, squared=False) - - plt.annotate(f'RMSE = {rmse:.3f}\n Adjusted $R^2$ = {AdjR2:.3f}', - xy=(0.05, 0.85), xycoords='axes fraction') - - plt.ylabel("Original Model") - plt.xlabel("PCE Model") - plt.grid() - - # save the current figure - plot_name = key.replace(' ', '_') - fig.savefig(f'./{self.out_dir}/Model_vs_PCEModel_{plot_name}.pdf', - bbox_inches='tight') - - # Destroy the current plot - plt.close() - - # ------------------------------------------------------------------------- - def _plot_validation_multi(self, x_values=[], x_axis="x [m]"): + def _plot_validation_multi(self, out_mean, out_std, model_out): """ Plots outputs for visual comparison of metamodel outputs with that of the (full) multioutput original model Parameters ---------- - x_values : list or array, optional - List of x values. The default is []. - x_axis : str, optional - Label of the x axis. The default is "x [m]". + out_mean : dict + MetaModel mean outputs. + out_std : dict + MetaModel stdev outputs. + model_out : dict + Model outputs. Raises ------ @@ -1132,65 +1039,61 @@ class PostProcessing: Returns ------- - None. + None """ - # This function currently only supports PCE/aPCE - metamod = self.engine.MetaModel - if not hasattr(metamod, 'meta_model_type'): - raise AttributeError( - 'This evaluation only support PCE-type models!') - if metamod.meta_model_type.lower() not in ['pce', 'apce']: - raise AttributeError( - 'This evaluation only support PCE-type models!') - # List of markers and colors - color = cycle((['b', 'g', 'r', 'y', 'k'])) - marker = cycle(('x', 'd', '+', 'o', '*')) + color = cycle((["b", "g", "r", "y", "k"])) + marker = cycle(("x", "d", "+", "o", "*")) + metamod_name = self.engine.MetaModel.meta_model_type.lower() - fig = plt.figure() # Plot the model vs PCE model + fig = plt.figure() for _, key in enumerate(self.engine.out_names): - - y_pce_val = self.pce_out_mean[key] - y_pce_val_std = self.pce_out_std[key] - y_val = self.model_out_dict[key] - try: - x = self.model_out_dict['x_values'][key] - except (TypeError, IndexError): - x = x_values + y_val = out_mean[key] + y_val_std = out_std[key] + y_val = model_out[key] for idx in range(y_val.shape[0]): - color_ = next(color) - marker_ = next(marker) - - plt.plot(x, y_val[idx], color=color_, marker=marker_, - label='$Y_{%s}^M$' % (idx+1)) - - plt.plot(x, y_pce_val[idx], color=color_, marker=marker_, - linestyle='--', - label='$Y_{%s}^{PCE}$' % (idx+1)) - plt.fill_between(x, y_pce_val[idx]-1.96*y_pce_val_std[idx], - y_pce_val[idx]+1.96*y_pce_val_std[idx], - color=color_, alpha=0.15) + plt.plot( + self.x_values, + y_val[idx], + color=next(color), + marker=next(marker), + label="$Y_{%s}^M$" % (idx + 1), + ) + plt.plot( + self.x_values, + y_val[idx], + color=next(color), + marker=next(marker), + linestyle="--", + label="$Y_{{{}}}^{{{}}}$".format(idx + 1, metamod_name), + ) + plt.fill_between( + self.x_values, + y_val[idx] - 1.96 * y_val_std[idx], + y_val[idx] + 1.96 * y_val_std[idx], + color=next(color), + alpha=0.15, + ) # Calculate the RMSE - rmse = mean_squared_error(y_pce_val, y_val, squared=False) - R2 = r2_score(y_pce_val[idx].reshape(-1, 1), - y_val[idx].reshape(-1, 1)) - - plt.annotate(f'RMSE = {rmse:.3f}\n $R^2$ = {R2:.3f}', - xy=(0.85, 0.1), xycoords='axes fraction') + rmse = root_mean_squared_error(y_val, y_val) + r_2 = r2_score(y_val[idx].reshape(-1, 1), y_val[idx].reshape(-1, 1)) + plt.annotate( + f"RMSE = {rmse}\n $R^2$ = {r_2:.3f}", + xy=(0.85, 0.1), + xycoords="axes fraction", + ) plt.ylabel(key) - plt.xlabel(x_axis) - plt.legend(loc='best') + plt.xlabel(self.xlabel) + plt.legend(loc="best") plt.grid() - - # save the current figure - plot_name = key.replace(' ', '_') - fig.savefig(f'./{self.out_dir}/Model_vs_PCEModel_{plot_name}.pdf', - bbox_inches='tight') - - # Destroy the current plot + key = key.replace(" ", "_") + fig.savefig( + f"./{self.out_dir}/Model_vs_{metamod_name}Model_{key}.{self.out_format}", + bbox_inches="tight", + ) plt.close() diff --git a/src/bayesvalidrox/pylink/pylink.py b/src/bayesvalidrox/pylink/pylink.py index be6f9c82a43f7ab7fc87bfcc4941cb74cc86e0e7..56821a682cfa5de7982b430432113aae10f2740d 100644 --- a/src/bayesvalidrox/pylink/pylink.py +++ b/src/bayesvalidrox/pylink/pylink.py @@ -340,8 +340,14 @@ class PyLinkForwardModel(object): break else: # Run command + print(f"Running command: ./../{command}") + Process = os.system(f'./../{command}') if Process != 0: + + print(f"Command failed with exit code: {Process}") + + print('\nMessage 1:') print(f'\tIf the value of \'{Process}\' is a non-zero value' ', then compilation problems occur \n' % Process) diff --git a/src/bayesvalidrox/surrogate_models/engine.py b/src/bayesvalidrox/surrogate_models/engine.py index ad3449ba964e74d0d0b558b2a630840ab485e100..ba204947a5ae5d8b3e99812cf787b66e43757fca 100644 --- a/src/bayesvalidrox/surrogate_models/engine.py +++ b/src/bayesvalidrox/surrogate_models/engine.py @@ -21,6 +21,7 @@ from .polynomial_chaos import PCE from.surrogate_models import MetaModel as MM from bayesvalidrox.bayes_inference.bayes_inference import BayesInference from bayesvalidrox.bayes_inference.discrepancy import Discrepancy +from bayesvalidrox.surrogate_models.supplementary import root_mean_squared_error class Engine: @@ -277,6 +278,10 @@ class Engine: mc_ref = True self.Model.read_observation('mc_ref') + if self.Model.mc_reference is not None: + mc_ref = True + self.Model.read_observation('mc_ref') + # elif self.Model.mc_reference != {}: # mc_ref = True # self.Model.read_observation('mc_ref') @@ -996,7 +1001,7 @@ class Engine: # TODO: check that the self-implemented version of RMSE without the additional settings # returns the same results as the former mse - scikit-version! for key in self.out_names: - rms_error[key] = self._root_mean_squared_error( + rms_error[key] = root_mean_squared_error( valid_model_runs[key], valid_PCE_runs[key]#, #multioutput='raw_values', # TODO: how does this parameter translate? #sample_weight=None, # No weights @@ -1018,25 +1023,6 @@ class Engine: return rms_error, valid_error - def _root_mean_squared_error(self, reference, approximation): - """ - Implementation of RMSE - - Parameters - ---------- - reference : np.array - Reference values - approximation : np.array - Approximation values - - Returns - ------- - rmse : np.array - RMSE of approximation against the reference along the samples. - """ - rmse = np.sqrt(np.mean(np.power(reference-approximation,2), axis = 0)) - return rmse - # ------------------------------------------------------------------------- def _error_Mean_Std(self): """ @@ -1064,8 +1050,15 @@ class Engine: for output in self.out_names: # Compute the error between mean and std of MetaModel and OrigModel # TODO: write test that checks if mc_reference exists - rmse_mean += self._root_mean_squared_error(self.Model.mc_reference['mean'], means[output]) - rmse_std += self._root_mean_squared_error(self.Model.mc_reference['std'], stds[output]) + + if self.Model.mc_reference is None: + raise AttributeError( + 'Model.mc_reference needs to be given to calculate the error mean!') + + print(f"Reference mean: {self.Model.mc_reference}") + + rmse_mean += root_mean_squared_error(self.Model.mc_reference['mean'], means[output]) + rmse_std += root_mean_squared_error(self.Model.mc_reference['std'], stds[output]) return rmse_mean, rmse_std diff --git a/src/bayesvalidrox/surrogate_models/polynomial_chaos.py b/src/bayesvalidrox/surrogate_models/polynomial_chaos.py index 620ea090885c3514f7ac2aa989d27951e2c86b5a..9c0a4f5fcb800e75160a43ef8f0714aed65865e8 100644 --- a/src/bayesvalidrox/surrogate_models/polynomial_chaos.py +++ b/src/bayesvalidrox/surrogate_models/polynomial_chaos.py @@ -133,6 +133,9 @@ class PCE(MetaModel): self._q_norm_dict = None self._deg_dict = None + self.sobol = None + self.total_sobol = None + # Initialize the regression options as a dictionary self.set_regression_options() @@ -926,7 +929,11 @@ class PCE(MetaModel): return pce_means, pce_stds +<<<<<<< HEAD def calculate_sobol(self, Y_train = None): +======= + def calculate_sobol(self, y_train = None): +>>>>>>> fb608a8b17595e3ddbb737043787e6d28266d003 """ Provides Sobol' indices as a sensitivity measure to infer the importance of the input parameters. See Eq. 27 in [1] for more details. For the @@ -944,7 +951,11 @@ class PCE(MetaModel): Parameters ---------- +<<<<<<< HEAD Y_train: dict, optional +======= + y_train: dict, optional +>>>>>>> fb608a8b17595e3ddbb737043787e6d28266d003 Trainings outputs. They are needed when used in combination with PCA. The default is None @@ -956,6 +967,12 @@ class PCE(MetaModel): Total Sobol' indices """ +<<<<<<< HEAD +======= + if self.dim_red_method == 'pca' and y_train is None: + raise AttributeError("Calculation of Sobol' indices with PCA expects training outputs, but none are given.") + +>>>>>>> fb608a8b17595e3ddbb737043787e6d28266d003 max_order = np.max(self._pce_deg) n_params = len(self.input_obj.Marginals) cov_zpq = np.zeros((n_params)) @@ -1058,7 +1075,11 @@ class PCE(MetaModel): # Compute the sobol indices according to Ref. 2 if self.dim_red_method.lower() == "pca": +<<<<<<< HEAD n_c_points = Y_train[output].shape[1] +======= + n_c_points = y_train[output].shape[1] +>>>>>>> fb608a8b17595e3ddbb737043787e6d28266d003 pca = self.pca[f"b_{b_i+1}"][output] comp_pca = pca.components_ n_comp = comp_pca.shape[0] @@ -1076,7 +1097,11 @@ class PCE(MetaModel): for t_idx in range(n_c_points): var_yt = np.var( +<<<<<<< HEAD Y_train[output][:, t_idx] +======= + y_train[output][:, t_idx] +>>>>>>> fb608a8b17595e3ddbb737043787e6d28266d003 ) term1, term2 = 0.0, 0.0 if var_yt != 0.0: @@ -1103,7 +1128,7 @@ class PCE(MetaModel): s_zi = total_sobol_array[par_idx] for t_idx in range(n_c_points): - var_yt = np.var(Y_train[output][:, t_idx]) + var_yt = np.var(y_train[output][:, t_idx]) term1, term2 = 0.0, 0.0 if var_yt != 0.0: for i in range(n_comp): @@ -1164,167 +1189,3 @@ class PCE(MetaModel): return self.sobol, self.total_sobol - def plot_sobol(self): - - # ---------------- Plot ----------------------- - par_names = self.engine.ExpDesign.par_names - x_values_orig = self.engine.ExpDesign.x_values - - # Check if the x_values match the number of metamodel outputs - if ( - not np.array(x_values_orig).shape[0] - == self.total_sobol[outputs[0]].shape[1] - ): - print( - "The number of MetaModel outputs does not match the x_values" - " specified in ExpDesign. Images are created with " - "equidistant numbers on the x-axis" - ) - x_values_orig = np.arange(0, 1, self.total_sobol[output].shape[0]) - - if 1: - fig = plt.figure() - - for i_order in range(1, max_order + 1): - # Change labels to combined params sets for higher order indices - if i_order == 1: - par_names_i = par_names - else: - par_names_i = list(combinations(par_names, i_order)) - for outIdx, output in enumerate(outputs): - # Extract total Sobol indices - sobol = self.sobol[i_order][output][0] - - # Compute quantiles - q_5 = np.quantile(sobol_all[i_order][output], q=0.05, axis=0) - q_97_5 = np.quantile(sobol_all[i_order][output], q=0.975, axis=0) - - # Extract a list of x values - if type(x_values_orig) is dict: - x = x_values_orig[output] - else: - x = x_values_orig - - if plot_type == "bar": - ax = fig.add_axes([0, 0, 1, 1]) - dict1 = {xlabel: x} - dict2 = { - param: sobolIndices - for param, sobolIndices in zip(par_names_i, sobol) - } - - df = pd.DataFrame({**dict1, **dict2}) - df.plot( - x=xlabel, - y=par_names_i, - kind="bar", - ax=ax, - rot=0, - colormap="Dark2", - yerr=q_97_5 - q_5, - ) - ax.set_ylabel("Sobol indices, $S^T$") - - else: - for i, sobolIndices in enumerate(sobol): - plt.plot( - x, - sobolIndices, - label=par_names_i[i], - marker="x", - lw=2.5, - ) - plt.fill_between(x, q_5[i], q_97_5[i], alpha=0.15) - - plt.ylabel("Sobol indices, $S^T$") - plt.xlabel(xlabel) - - plt.title(f"{i_order} degree Sensitivity analysis of {output}") - if plot_type != "bar": - plt.legend(loc="best", frameon=True) - - # Save indices - np.savetxt( - f"{self.out_dir}sobol_{i_order}_" - + output.replace("/", "_") - + ".csv", - sobol.T, - delimiter=",", - header=",".join(par_names), - comments="", - ) - - # save the current figure - # print(f'{self.out_dir}/{newpath}Sobol_indices_{i_order}_{output}.pdf') - fig.savefig( - f"{self.out_dir}Sobol_indices_{i_order}_{output}.pdf", - bbox_inches="tight", - ) - - # Destroy the current plot - plt.clf() - - fig = plt.figure() - - for outIdx, output in enumerate(outputs): - # Extract total Sobol indices - total_sobol = self.total_sobol[output] - - # Compute quantiles - q_5 = np.quantile(total_sobol_all[output], q=0.05, axis=0) - q_97_5 = np.quantile(total_sobol_all[output], q=0.975, axis=0) - - # Extract a list of x values - if type(x_values_orig) is dict: - x = x_values_orig[output] - else: - x = x_values_orig - - if plot_type == "bar": - ax = fig.add_axes([0, 0, 1, 1]) - dict1 = {xlabel: x} - dict2 = { - param: sobolIndices - for param, sobolIndices in zip(par_names, total_sobol) - } - - df = pd.DataFrame({**dict1, **dict2}) - df.plot( - x=xlabel, - y=par_names, - kind="bar", - ax=ax, - rot=0, - colormap="Dark2", - yerr=q_97_5 - q_5, - ) - ax.set_ylabel("Total Sobol indices, $S^T$") - - else: - for i, sobolIndices in enumerate(total_sobol): - plt.plot(x, sobolIndices, label=par_names[i], marker="x", lw=2.5) - plt.fill_between(x, q_5[i], q_97_5[i], alpha=0.15) - - plt.ylabel("Total Sobol indices, $S^T$") - plt.xlabel(xlabel) - - plt.title(f"Sensitivity analysis of {output}") - if plot_type != "bar": - plt.legend(loc="best", frameon=True) - - # Save indices - np.savetxt( - f"{self.out_dir}totalsobol_" + output.replace("/", "_") + ".csv", - total_sobol.T, - delimiter=",", - header=",".join(par_names), - comments="", - ) - - # save the current figure - fig.savefig( - f"{self.out_dir}TotalSobol_indices_{output}.pdf", bbox_inches="tight" - ) - - # Destroy the current plot - plt.clf() diff --git a/src/bayesvalidrox/surrogate_models/supplementary.py b/src/bayesvalidrox/surrogate_models/supplementary.py index fa1042f6d3f89724192e15a69042e104f749783f..bd2aa6ce6d2a175dc45bb8dd211f0215a9a35204 100644 --- a/src/bayesvalidrox/surrogate_models/supplementary.py +++ b/src/bayesvalidrox/surrogate_models/supplementary.py @@ -315,3 +315,22 @@ def create_psi(basis_indices, univ_p_val): raise err return psi +def root_mean_squared_error(reference, approximation): + """ + Implementation of RMSE + + Parameters + ---------- + reference : np.array + Reference values + approximation : np.array + Approximation values + + Returns + ------- + rmse : np.array + RMSE of approximation against the reference along the samples. + """ + rmse = np.sqrt(np.mean(np.power(reference-approximation,2), axis = 0)) + return rmse + diff --git a/tests/test_PolynomialChaosEmulator.py b/tests/test_PolynomialChaosEmulator.py index 64858588d1382caa8df9002fec2ed4c2d7ef9b9a..d67fe0ec144d2a24025dc6bcb18bdfb285f06db7 100644 --- a/tests/test_PolynomialChaosEmulator.py +++ b/tests/test_PolynomialChaosEmulator.py @@ -840,6 +840,20 @@ def test_calculate_moments_pca() -> None: mm.calculate_moments() +def test_calculate_moments_verbose() -> None: + """ + Calculate moments of a pce-surrogate with pca + """ + inp = Input() + inp.add_marginals() + inp.Marginals[0].dist_type = 'normal' + inp.Marginals[0].parameters = [0, 1] + mm = PCE(inp) + mm.verbose = True + mm.fit([[0.2], [0.8]], {'Z': [[0.4, 0.4], [0.5, 0.6]]}) + mm.calculate_moments() + + #%% Test PCE.update_metamodel # TODO: taken from engine @@ -873,3 +887,23 @@ def test_calculate_sobol_pca(): mm.fit([[0.2], [0.8]], {'Z': [[0.4, 0.4], [0.5, 0.6]]}) sobol, totalsobol = mm.calculate_sobol({'Z': np.array([[0.4, 0.4], [0.5, 0.6]])}) # TODO are there theory-related checks that could be applied here? +<<<<<<< HEAD +======= + +def test_calculate_sobol_pcanoy(): + """ + Calculate Sobol' indices of a pce-surrogate with PCA but no outputs + """ + inp = Input() + inp.add_marginals() + inp.Marginals[0].dist_type = 'normal' + inp.Marginals[0].parameters = [0, 1] + mm = PCE(inp) + mm.dim_red_method = 'pca' + mm.fit([[0.2], [0.8]], {'Z': [[0.4, 0.4], [0.5, 0.6]]}) + + with pytest.raises(AttributeError) as excinfo: + mm.calculate_sobol() + assert str(excinfo.value) == ("Calculation of Sobol' indices with PCA expects training outputs, but none are given.") + +>>>>>>> fb608a8b17595e3ddbb737043787e6d28266d003 diff --git a/tests/test_PostProcessing.py b/tests/test_PostProcessing.py index 322fdc5c3e33d4b3869401703aab12459ede0dfa..5c79a9009f6fd3797fdc9492efed5b1fdd79284a 100644 --- a/tests/test_PostProcessing.py +++ b/tests/test_PostProcessing.py @@ -7,19 +7,19 @@ Class PostProcessing: plot_moments valid_metamodel check_accuracy - plot_seq_design + plot_seq_design_diagnostics sobol_indices + plot_sobol check_req_quality - eval_pce_model_3d - _get_sample - _eval_model - _plot_validation + plot_metamodel_3d _plot_validation_multi """ + import sys sys.path.append("../src/") import numpy as np import pytest +import os from bayesvalidrox.post_processing.post_processing import PostProcessing from bayesvalidrox.surrogate_models.inputs import Input @@ -43,6 +43,20 @@ def basic_engine(): engine = Engine(mm, mod, expdes) engine.out_names = ['Z'] engine.emulator = True + + return engine + +@pytest.fixture +def engine_no_MetaModel(): + inp = Input() + inp.add_marginals() + inp.Marginals[0].dist_type = 'normal' + inp.Marginals[0].parameters = [0, 1] + mod = PL() + expdes = ExpDesigns(inp) + engine = Engine(None, mod, expdes) + engine.trained = True + return engine @pytest.fixture @@ -128,8 +142,10 @@ def pce_engine(): inp = Input() inp.add_marginals() + inp.Marginals[0].name = 'x' inp.Marginals[0].dist_type = 'normal' inp.Marginals[0].parameters = [0, 1] + expdes = ExpDesigns(inp) expdes.init_param_space(max_deg=1) expdes.X = np.array([[0], [1], [0.5]]) @@ -142,37 +158,60 @@ def pce_engine(): engine = Engine(mm, mod, expdes) engine.out_names = ['Z'] engine.emulator = True + engine.trained = True return engine @pytest.fixture -def gpe_engine(): +def pce_engine_3d_plot(): + # Initialize the Input object for the problem inp = Input() + + # Add marginals for the input dimensions inp.add_marginals() - inp.Marginals[0].name = 'x' + inp.Marginals[0].name = 'x1' inp.Marginals[0].dist_type = 'normal' - inp.Marginals[0].parameters = [0, 1] + inp.Marginals[0].parameters = [0, 1] # mean = 0, std = 1 + + inp.add_marginals() + inp.Marginals[1].name = 'x2' + inp.Marginals[1].dist_type = 'normal' + inp.Marginals[1].parameters = [0, 1] # mean = 0, std = 1 + + # Initialize the experimental design expdes = ExpDesigns(inp) - expdes.init_param_space(max_deg=1) - expdes.X = np.array([[0], [1], [0.5]]) - expdes.Y = {'Z': [[0.4], [0.5], [0.45]]} - expdes.x_values = [0] + expdes.init_param_space(max_deg=1) # Define the degree of the polynomial expansion - mm = GPESkl(inp) - mm.fit(expdes.X, expdes.Y) - mod = PL() + # Defining the design points and response (output) + expdes.Y = { + 'Z': [[0.4], [0.5], [0.3], [0.4]], + 'Y': [[0.35], [0.45], [0.40], [0.38]] + } + expdes.X = np.array([[0,0], [1,0], [0.5,0.3], [0.3,0.7]]) + expdes.x_values = [0] # Example x-values (could be used for visualization or plotting) + + # Create and fit the Polynomial Chaos Expansion model (PCE) + mm = PCE(inp) # Initialize the PCE model + mm.fit(expdes.X, expdes.Y) # Fit the model to the design points and outputs + + # Define a surrogate model or predictor + mod = PL() + # Initialize the Engine with the metamodel, model, and experimental design engine = Engine(mm, mod, expdes) - engine.out_names = ['Z'] - engine.emulator = True - engine.trained = True - return engine + engine.out_names = ['Z', 'Y'] # Define the output names + engine.emulator = True # Indicate that the engine is emulating a trained model + engine.trained = True # Mark the engine as trained + + return engine # Return the configured engine @pytest.fixture def gpe_engine(): inp = Input() + inp.add_marginals() inp.Marginals[0].name = 'x' inp.Marginals[0].dist_type = 'normal' inp.Marginals[0].parameters = [0, 1] + expdes = ExpDesigns(inp) expdes.init_param_space() expdes.X = np.array([[0], [1], [0.5]]) @@ -193,15 +232,28 @@ def gpe_engine(): def test_postprocessing_noengine(): None -def test_postprocessing(basic_engine) -> None: +def test_postprocessing_untrained_engine(basic_engine) -> None: engine = basic_engine + with pytest.raises(AttributeError) as excinfo: + PostProcessing(engine) + assert str(excinfo.value) == 'PostProcessing can only be performed on trained engines.' + +def test_postprocessing_no_MetaModel(engine_no_MetaModel) -> None: + + engine = engine_no_MetaModel + + with pytest.raises(AttributeError) as excinfo: + PostProcessing(engine) + assert str(excinfo.value) == 'PostProcessing can only be performed on engines with a trained MetaModel.' + +def test_postprocessing_pce(pce_engine) -> None: + engine = pce_engine PostProcessing(engine) def test_postprocessing_gpe(gpe_engine) -> None: engine = gpe_engine PostProcessing(engine) #%% plot_moments - def test_plot_moments_pce(pce_engine) -> None: """ Plot moments for PCE metamodel @@ -265,7 +317,21 @@ def test_plot_moments_gpebar(gpe_engine) -> None: assert list(stdev.keys()) == ['Z'] assert stdev['Z'].shape == (1,) assert stdev['Z'][0] == pytest.approx(0.1, abs=0.01) + #%% valid_metamodel +def test_valid_metamodel_pce(pce_engine): + engine = pce_engine + post = PostProcessing(engine) + samples = np.array([[0], [1], [0.5]]) + model_out_dict = {'Z': np.array([[0.4], [0.5], [0.45]])} + post.valid_metamodel(samples=samples, model_out_dict=model_out_dict) + +def test_valid_metamodel_gpe(gpe_engine): + engine = gpe_engine + post = PostProcessing(engine) + samples = np.array([[0], [1], [0.5]]) + model_out_dict = {'Z': np.array([[0.4], [0.5], [0.45]])} + post.valid_metamodel(samples=samples, model_out_dict=model_out_dict) #%% check_accuracy @@ -284,30 +350,54 @@ def test_check_accuracy_gpe(gpe_engine) -> None: engine = gpe_engine post = PostProcessing(engine) post.check_accuracy(samples = engine.ExpDesign.X, outputs = engine.ExpDesign.Y) -#%% plot_seq_design - -#%% sobol_indices - -def test_sobol_indices_nopce(basic_engine) -> None: +#%% plot_seq_design_diagnoxtics +def test_plot_seq_design_diagnostics(basic_engine_sequential): """ - Calculate sobol indices for non-PCE metamodel + Test the plot_seq_design_diagnostics method """ - engine = basic_engine - post = PostProcessing(engine) - with pytest.raises(AttributeError) as excinfo: - post.sobol_indices() - assert str(excinfo.value) == 'Sobol indices only support PCE-type models!' + engine = basic_engine_sequential + engine.ExpDesign.n_max_samples = 4 + engine.ExpDesign.n_init_samples = 3 + post = PostProcessing(engine) + post.plot_seq_design_diagnostics() + # Check if the plot was created and saved + assert os.path.exists(f"./{post.out_dir}/seq_design_diagnostics/seq_BME.{post.out_format}") + assert os.path.exists(f"./{post.out_dir}/seq_design_diagnostics/seq_KLD.{post.out_format}") + assert os.path.exists(f"./{post.out_dir}/seq_design_diagnostics/seq_Modified_LOO_error.{post.out_format}") + assert os.path.exists(f"./{post.out_dir}/seq_design_diagnostics/seq_RMSEMean.{post.out_format}") + assert os.path.exists(f"./{post.out_dir}/seq_design_diagnostics/seq_RMSEStd.{post.out_format}") + +#%% sobol_indices + def test_sobol_indices_pce(pce_engine) -> None: """ Calculate sobol indices for PCE metamodel """ engine = pce_engine post = PostProcessing(engine) - sobol = post.sobol_indices() - assert list(sobol.keys()) == ['Z'] - assert sobol['Z'].shape == (1,1) - assert sobol['Z'][0,0] == 1 + sobol, totalsobol = post.sobol_indices() + + assert list(totalsobol.keys()) == ['Z'] + assert totalsobol['Z'].shape == (1,1) + assert totalsobol['Z'][0,0] == 1 + + print(sobol) + assert list(sobol.keys()) == [1] + assert list(sobol[1].keys()) == ['Z'] + assert sobol[1]['Z'].shape == (1,1,1) + assert sobol[1]['Z'][0,0] == 1 + +def test_sobol_indices_with_invalid_model_type(gpe_engine) -> None: + """ + Calculate sobol indices with invalid model type + """ + engine = gpe_engine + post = PostProcessing(engine) + post.model_type = 'INVALID' + with pytest.raises(AttributeError) as excinfo: + post.sobol_indices() + assert "Sobol indices only support PCE-type models!" in str(excinfo.value) #%% check_reg_quality @@ -319,40 +409,58 @@ def test_check_reg_quality_pce(pce_engine) -> None: post = PostProcessing(engine) post.check_reg_quality(samples=engine.ExpDesign.X, outputs=engine.ExpDesign.Y) -#%% eplot_metamodel_3d - -def test_plot_metamodel_3d_nopce(basic_engine) -> None: +def test_check_reg_quality_gpe(gpe_engine) -> None: """ - 3d eval of non-PCE metamodel + Check the regression quality for GPE metamodel """ - engine = basic_engine + engine = gpe_engine post = PostProcessing(engine) - with pytest.raises(AttributeError) as excinfo: - post.plot_metamodel_3d() - assert str(excinfo.value) == 'This function is only applicable if the MetaModel input dimension is 2.' - + post.check_reg_quality(samples=engine.ExpDesign.X, outputs=engine.ExpDesign.Y) + # Add assertions to check the quality metrics if available #%% plot_metamodel_3d def test_plot_metamodel_3d_pce(pce_engine_3d_plot) -> None: """ - Plot validation of non-PCE metamodel + Test the plot_metamodel_3d method for PCE metamodel """ - engine = basic_engine - samples = engine.ExpDesign.generate_samples(10,'random') + engine = pce_engine_3d_plot post = PostProcessing(engine) - with pytest.raises(AttributeError) as excinfo: - post._plot_validation(samples) - assert str(excinfo.value) == 'This evaluation only support PCE-type models!' - -#%% _plot_validation_multi - -def test_plot_validation_multi_nopce(basic_engine) -> None: + post.plot_metamodel_3d() + # Check if the plot was created and saved + assert os.path.exists(f"./{post.out_dir}/3DPlot_MetaModel_Z0.{post.out_format}") + assert os.path.exists(f"./{post.out_dir}/3DPlot_MetaModel_Y0.{post.out_format}") + + +#%% _plot_validation_multi only for PCE +def test_plot_validation_multi(pce_engine_3d_plot): """ - Plot multi-validation of non-PCE metamodel + Test the _plot_validation_multi method """ - engine = basic_engine + engine = pce_engine_3d_plot post = PostProcessing(engine) - with pytest.raises(AttributeError) as excinfo: - post._plot_validation_multi() - assert str(excinfo.value) == 'This evaluation only support PCE-type models!' - \ No newline at end of file + y_val = {'Z': np.array([[1], [2], [3], [4], [5]]), + 'Y': np.array([[1], [2], [3], [4], [5]])} + y_val_std = {'Z': np.array([[0.1], [0.2], [0.3], [0.4], [0.5]]), + 'Y': np.array([[0.1], [0.2], [0.3], [0.4], [0.5]])} + model_out = {'Z': np.array([[1.5],[2],[3.5],[4],[4.5]]), + 'Y': np.array([[1.5],[2],[3.5],[4],[4.5]])} + post._plot_validation_multi(y_val, y_val_std, model_out) + # Check if the plot was created and saved + assert os.path.exists(f"./{post.out_dir}/Model_vs_pceModel_Y.{post.out_format}") + assert os.path.exists(f"./{post.out_dir}/Model_vs_pceModel_Z.{post.out_format}") + +def test_plot_validation_multi_pce(pce_engine): + engine = pce_engine + post = PostProcessing(engine) + out_mean = {'Z': np.array([[0.4], [0.5], [0.45], [0.4]])} + out_std = {'Z': np.array([[0.1], [0.1], [0.1], [0.1]])} + model_out_dict = {'Z': np.array([[0.4], [0.5],[0.3],[0.4]])} + post._plot_validation_multi(out_mean, out_std, model_out_dict) + +def test_plot_validation_multi_gpe(gpe_engine): + engine = gpe_engine + post = PostProcessing(engine) + out_mean = {'Z': np.array([[0.4], [0.5], [0.45]])} + out_std = {'Z': np.array([[0.1], [0.1], [0.1]])} + model_out_dict = {'Z': np.array([[0.4], [0.5], [0.45]])} + post._plot_validation_multi(out_mean, out_std, model_out_dict)