Package bayesvalidrox
Expand source code
# -*- coding: utf-8 -*-
__version__ = "0.0.2"
from .pylink.pylink import PyLinkForwardModel
from .surrogate_models.surrogate_models import MetaModel
from .surrogate_models.inputs import Input
from .post_processing.post_processing import PostProcessing
from .bayes_inference.bayes_inference import BayesInference
from .bayes_inference.discrepancy import Discrepancy
__all__ = [
"__version__",
"PyLinkForwardModel",
"Input",
"Discrepancy",
"MetaModel",
"PostProcessing",
"BayesInference"
]
Sub-modules
bayesvalidrox.bayes_inference
bayesvalidrox.post_processing
bayesvalidrox.pylink
bayesvalidrox.surrogate_models
Classes
class PyLinkForwardModel (link_type='pylink', name=None, py_file=None, shell_command='', input_file=None, input_template=None, aux_file=None, exe_path='', output_file_names=[], output_names=[], output_parser='', multi_process=True, n_cpus=None, meas_file=None, meas_file_valid=None, mc_ref_file=None, obs_dict={}, obs_dict_valid={}, mc_ref_dict={})
-
A forward model binder
This calss serves as a code wrapper. This wrapper allows the execution of a third-party software/solver within the scope of BayesValidRox.
Attributes
link_type
:str
- The type of the wrapper. The default is
'pylink'
. This runs the third-party software or an executable using a sell command with given input files. Second option is'function'
which assumed that model can be run using a function written separately in a Python script. name
:str
- Name of the model.
py_file
:str
- Python file name without
.py
extension to be run for the'function'
wrapper. Note that the name of the python file and that of the function must be simillar. This function must recieve the parameters in an array of shape(n_samples, n_params)
and returns a dictionary with the x_values and output arrays for given output names. shell_command
:str
- Shell command to be executed for the
'pylink'
wrapper. input_file
:str
orlist
- The input file to be passed to the
'pylink'
wrapper. input_template
:str
orlist
- A template input file to be passed to the
'pylink'
wrapper. This file must be a copy ofinput_file
with<Xi>
place holder for the input parameters defined usinginputs
class, with i being the number of parameter. The file name ending should include.tpl
before the actual extension of the input file, for example,params.tpl.input
. aux_file
:str
orlist
- The list of auxiliary files needed for the
'pylink'
wrapper. exe_path
:str
- Execution path if you wish to run the model for the
'pylink'
wrapper in another directory. The default isNone
, which corresponds to the currecnt working directory. output_file_names
:list
ofstr
- List of the name of the model output text files for the
'pylink'
wrapper. output_names
:list
ofstr
- List of the model outputs to be used for the analysis.
output_parser
:str
- Name of the model parser file (without
.py
extension) that recieves theoutput_file_names
and returns a 2d-array with the first row being the x_values, e.g. x coordinates or time and the rest of raws pass the simulation output for each model output defined inoutput_names
. Note that again here the name of the file and that of the function must be the same. multi_process
:bool
- Whether the model runs to be executed in parallel for the
'pylink'
wrapper. The default isTrue
. n_cpus
:int
- The number of cpus to be used for the parallel model execution for the
'pylink'
wrapper. The default isNone
, which corresponds to all available cpus. meas_file
:str
- The name of the measurement text-based file. This file must contain
x_values as the first column and one column for each model output. The
default is
None
. Only needed for the Bayesian Inference. meas_file_valid
:str
- The name of the measurement text-based file for the validation. The
default is
None
. Only needed for the validation with Bayesian Inference. mc_ref_file
:str
- The name of the text file for the Monte-Carlo reference (mean and
standard deviation) values. It must contain
x_values
as the first column,mean
as the second column andstd
as the third. It can be used to compare the estimated moments using meta-model in the post- processing step. This is only available for one output. obs_dict
:dict
- A dictionary containing the measurement text-based file. It must
contain
x_values
as the first item and one item for each model output . The default is{}
. Only needed for the Bayesian Inference. obs_dict_valid
:dict
- A dictionary containing the validation measurement text-based file. It
must contain
x_values
as the first item and one item for each model output. The default is{}
. mc_ref_dict
:dict
- A dictionary containing the Monte-Carlo reference (mean and standard
deviation) values. It must contain
x_values
as the first item andmean
as the second item andstd
as the third. The default is{}
. This is only available for one output.
Expand source code
class PyLinkForwardModel(object): """ A forward model binder This calss serves as a code wrapper. This wrapper allows the execution of a third-party software/solver within the scope of BayesValidRox. Attributes ---------- link_type : str The type of the wrapper. The default is `'pylink'`. This runs the third-party software or an executable using a sell command with given input files. Second option is `'function'` which assumed that model can be run using a function written separately in a Python script. name : str Name of the model. py_file : str Python file name without `.py` extension to be run for the `'function'` wrapper. Note that the name of the python file and that of the function must be simillar. This function must recieve the parameters in an array of shape `(n_samples, n_params)` and returns a dictionary with the x_values and output arrays for given output names. shell_command : str Shell command to be executed for the `'pylink'` wrapper. input_file : str or list The input file to be passed to the `'pylink'` wrapper. input_template : str or list A template input file to be passed to the `'pylink'` wrapper. This file must be a copy of `input_file` with `<Xi>` place holder for the input parameters defined using `inputs` class, with i being the number of parameter. The file name ending should include `.tpl` before the actual extension of the input file, for example, `params.tpl.input`. aux_file : str or list The list of auxiliary files needed for the `'pylink'` wrapper. exe_path : str Execution path if you wish to run the model for the `'pylink'` wrapper in another directory. The default is `None`, which corresponds to the currecnt working directory. output_file_names : list of str List of the name of the model output text files for the `'pylink'` wrapper. output_names : list of str List of the model outputs to be used for the analysis. output_parser : str Name of the model parser file (without `.py` extension) that recieves the `output_file_names` and returns a 2d-array with the first row being the x_values, e.g. x coordinates or time and the rest of raws pass the simulation output for each model output defined in `output_names`. Note that again here the name of the file and that of the function must be the same. multi_process: bool Whether the model runs to be executed in parallel for the `'pylink'` wrapper. The default is `True`. n_cpus: int The number of cpus to be used for the parallel model execution for the `'pylink'` wrapper. The default is `None`, which corresponds to all available cpus. meas_file : str The name of the measurement text-based file. This file must contain x_values as the first column and one column for each model output. The default is `None`. Only needed for the Bayesian Inference. meas_file_valid : str The name of the measurement text-based file for the validation. The default is `None`. Only needed for the validation with Bayesian Inference. mc_ref_file : str The name of the text file for the Monte-Carlo reference (mean and standard deviation) values. It must contain `x_values` as the first column, `mean` as the second column and `std` as the third. It can be used to compare the estimated moments using meta-model in the post- processing step. This is only available for one output. obs_dict : dict A dictionary containing the measurement text-based file. It must contain `x_values` as the first item and one item for each model output . The default is `{}`. Only needed for the Bayesian Inference. obs_dict_valid : dict A dictionary containing the validation measurement text-based file. It must contain `x_values` as the first item and one item for each model output. The default is `{}`. mc_ref_dict : dict A dictionary containing the Monte-Carlo reference (mean and standard deviation) values. It must contain `x_values` as the first item and `mean` as the second item and `std` as the third. The default is `{}`. This is only available for one output. """ def __init__(self, link_type='pylink', name=None, py_file=None, shell_command='', input_file=None, input_template=None, aux_file=None, exe_path='', output_file_names=[], output_names=[], output_parser='', multi_process=True, n_cpus=None, meas_file=None, meas_file_valid=None, mc_ref_file=None, obs_dict={}, obs_dict_valid={}, mc_ref_dict={}): self.link_type = link_type self.name = name self.shell_command = shell_command self.py_file = py_file self.input_file = input_file self.input_template = input_template self.aux_file = aux_file self.exe_path = exe_path self.multi_process = multi_process self.n_cpus = n_cpus self.Output.parser = output_parser self.Output.names = output_names self.Output.file_names = output_file_names self.meas_file = meas_file self.meas_file_valid = meas_file_valid self.mc_ref_file = mc_ref_file self.observations = obs_dict self.observations_valid = obs_dict_valid self.mc_reference = mc_ref_dict # Nested class class Output: def __init__(self): self.parser = None self.names = None self.file_names = None # ------------------------------------------------------------------------- def within_range(self, out, minout, maxout): inside = False if (out > minout).all() and (out < maxout).all(): inside = True return inside # ------------------------------------------------------------------------- def read_observation(self, case='calib'): """ Reads/prepare the observation/measurement data for calibration. Returns ------- DataFrame A dataframe with the calibration data. """ if case.lower() == 'calib': if isinstance(self.observations, dict) and bool(self.observations): obs = pd.DataFrame.from_dict(self.observations) elif self.meas_file is not None: file_path = os.path.join(os.getcwd(), self.meas_file) obs = pd.read_csv(file_path, delimiter=',') elif isinstance(self.observations, pd.DataFrame): obs = self.observations else: raise Exception("Please provide the observation data as a " "dictionary via observations attribute or pass" " the csv-file path to MeasurementFile " "attribute") elif case.lower() == 'valid': if isinstance(self.observations_valid, dict) and \ bool(self.observations_valid): obs = pd.DataFrame.from_dict(self.observations_valid) elif self.meas_file_valid is not None: file_path = os.path.join(os.getcwd(), self.meas_file_valid) obs = pd.read_csv(file_path, delimiter=',') elif isinstance(self.observations_valid, pd.DataFrame): obs = self.observations_valid else: raise Exception("Please provide the observation data as a " "dictionary via Observations attribute or pass" " the csv-file path to MeasurementFile " "attribute") # Compute the number of observation n_obs = obs[self.Output.names].notnull().sum().values.sum() if case.lower() == 'calib': self.observations = obs self.n_obs = n_obs return self.observations elif case.lower() == 'valid': self.observations_valid = obs self.n_obs_valid = n_obs return self.observations_valid # ------------------------------------------------------------------------- def read_mc_reference(self): """ Is used, if a Monte-Carlo reference is available for further in-depth post-processing after meta-model training. Returns ------- None """ if self.mc_ref_file is None and not hasattr(self, 'mc_reference'): return elif isinstance(self.mc_reference, dict) and bool(self.mc_reference): self.mc_reference = pd.DataFrame.from_dict(self.mc_reference) elif self.mc_ref_file is not None: file_path = os.path.join(os.getcwd(), self.mc_ref_file) self.mc_reference = pd.read_csv(file_path, delimiter=',') else: raise Exception("Please provide the MC reference data as a " "dictionary via mc_reference attribute or pass the" " csv-file path to mc_ref_file attribute") return self.mc_reference # ------------------------------------------------------------------------- def read_output(self): """ Reads the the parser output file and returns it as an executable function. It is required when the models returns the simulation outputs in csv files. Returns ------- Output : func Output parser function. """ output_func_name = self.Output.parser output_func = getattr(__import__(output_func_name), output_func_name) file_names = [] for File in self.Output.file_names: file_names.append(os.path.join(self.exe_path, File)) try: output = output_func(self.name, file_names) except TypeError: output = output_func(file_names) return output # ------------------------------------------------------------------------- def update_input_params(self, new_input_file, param_set): """ Finds this pattern with <X1> in the new_input_file and replace it with the new value from the array param_sets. Parameters ---------- new_input_file : list List of the input files with the adapted names. param_set : array of shape (n_params) Parameter set. Returns ------- None. """ NofPa = param_set.shape[0] text_to_search_list = [f'<X{i+1}>' for i in range(NofPa)] for filename in new_input_file: # Read in the file with open(filename, 'r') as file: filedata = file.read() # Replace the target string for text_to_search, params in zip(text_to_search_list, param_set): filedata = filedata.replace(text_to_search, f'{params:0.4e}') # Write the file out again with open(filename, 'w') as file: file.write(filedata) # ------------------------------------------------------------------------- def run_command(self, command, output_file_names): """ Runs the execution command given by the user to run the given model. It checks if the output files have been generated. If yes, the jobe is done and it extracts and returns the requested output(s). Otherwise, it executes the command again. Parameters ---------- command : str The shell command to be executed. output_file_names : list Name of the output file names. Returns ------- simulation_outputs : array of shape (n_obs, n_outputs) Simulation outputs. """ # Check if simulation is finished while True: time.sleep(3) files = os.listdir(".") if all(elem in files for elem in output_file_names): break else: # Run command Process = os.system(f'./../{command}') if Process != 0: print('\nMessage 1:') print(f'\tIf value of \'{Process}\' is a non-zero value, ' 'then compilation problems \n' % Process) os.chdir("..") # Read the output simulation_outputs = self.read_output() return simulation_outputs # ------------------------------------------------------------------------- def run_forwardmodel(self, xx): """ This function creates subdirectory for the current run and copies the necessary files to this directory and renames them. Next, it executes the given command. Parameters ---------- xx : tuple A tuple including parameter set, simulation number and key string. Returns ------- output : array of shape (n_outputs+1, n_obs) An array passed by the output paraser containing the x_values as the first row and the simulations results stored in the the rest of the array. """ c_points, run_no, key_str = xx # Handle if only one imput file is provided if not isinstance(self.input_template, list): self.input_template = [self.input_template] if not isinstance(self.input_file, list): self.input_file = [self.input_file] new_input_file = [] # Loop over the InputTemplates: for in_temp in self.input_template: if '/' in in_temp: in_temp = in_temp.split('/')[-1] new_input_file.append(in_temp.split('.tpl')[0] + key_str + f"_{run_no+1}" + in_temp.split('.tpl')[1]) # Create directories newpath = self.name + key_str + f'_{run_no+1}' if not os.path.exists(newpath): os.makedirs(newpath) # Copy the necessary files to the directories for in_temp in self.input_template: # Input file(s) of the model shutil.copy2(in_temp, newpath) # Auxiliary file if self.aux_file is not None: shutil.copy2(self.aux_file, newpath) # Auxiliary file # Rename the Inputfile and/or auxiliary file os.chdir(newpath) for input_tem, input_file in zip(self.input_template, new_input_file): if '/' in input_tem: input_tem = input_tem.split('/')[-1] os.rename(input_tem, input_file) # Update the parametrs in Input file self.update_input_params(new_input_file, c_points) # Update the user defined command and the execution path try: new_command = self.shell_command.replace(self.input_file[0], new_input_file[0]) new_command = new_command.replace(self.input_file[1], new_input_file[1]) except: new_command = self.shell_command.replace(self.input_file[0], new_input_file[0]) # Set the exe path if not provided if not bool(self.exe_path): self.exe_path = os.getcwd() # Run the model output = self.run_command(new_command, self.Output.file_names) return output # ------------------------------------------------------------------------- def run_model_parallel(self, c_points, prevRun_No=0, key_str='', mp=True): """ Runs model simulations. If mp is true (default), then the simulations are started in parallel. Parameters ---------- c_points : array of shape (n_samples, n_params) Collocation points (training set). prevRun_No : int, optional Previous run number, in case the sequential design is selected. The default is `0`. key_str : str, optional A descriptive string for validation runs. The default is `''`. mp : bool, optional Multiprocessing. The default is `True`. Returns ------- all_outputs : dict A dictionary with x values (time step or point id) and all outputs. Each key contains an array of the shape `(n_samples, n_obs)`. new_c_points : array Updated collocation points (training set). If a simulation does not executed successfully, the parameter set is removed. """ # Create hdf5 metadata hdf5file = f'ExpDesign_{self.name}.hdf5' hdf5_exist = os.path.exists(hdf5file) file = h5py.File(hdf5file, 'a') # Initilization n_c_points = len(c_points) self.n_outputs = len(self.Output.names) all_outputs = {} # Extract the function if self.link_type.lower() == 'function': # Prepare the function Function = getattr(__import__(self.py_file), self.py_file) # --------------------------------------------------------------- # -------------- Multiprocessing with Pool Class ---------------- # --------------------------------------------------------------- # Start a pool with the number of CPUs if self.n_cpus is None: n_cpus = multiprocessing.cpu_count() else: n_cpus = self.n_cpus # Run forward model either normal or with multiprocessing if not self.multi_process: group_results = list([self.run_forwardmodel((c_points, prevRun_No, key_str))]) else: with multiprocessing.Pool(n_cpus) as p: desc = f'Running forward model {key_str}' if self.link_type.lower() == 'function': imap_var = p.imap(Function, c_points[:, np.newaxis]) else: args = zip(c_points, [prevRun_No+i for i in range(n_c_points)], [key_str]*n_c_points) imap_var = p.imap(self.run_forwardmodel, args) group_results = list(tqdm.tqdm(imap_var, total=n_c_points, desc=desc)) # Save time steps or x-values x_values = group_results[0][0] all_outputs["x_values"] = x_values if not hdf5_exist: if type(x_values) is dict: grp_x_values = file.create_group("x_values/") for varIdx, var in enumerate(self.Output.names): grp_x_values.create_dataset(var, data=x_values[var]) else: file.create_dataset("x_values", data=x_values) # save each output in their corresponding array NaN_idx = [] for varIdx, var in enumerate(self.Output.names): if not hdf5_exist: grpY = file.create_group("EDY/"+var) else: grpY = file.get("EDY/"+var) Outputs = np.asarray([item[varIdx+1] for item in group_results], dtype=np.float64) if prevRun_No == 0 and key_str == '': grpY.create_dataset(f'init_{key_str}', data=Outputs) else: try: oldEDY = np.array(file[f'EDY/{var}/adaptive_{key_str}']) del file[f'EDY/{var}/adaptive_{key_str}'] data = np.vstack((oldEDY, Outputs)) except KeyError: data = Outputs grpY.create_dataset('adaptive_'+key_str, data=data) NaN_idx = np.unique(np.argwhere(np.isnan(Outputs))[:, 0]) all_outputs[var] = np.delete(Outputs, NaN_idx, axis=0) if prevRun_No == 0 and key_str == '': grpY.create_dataset(f"New_init_{key_str}", data=all_outputs[var]) else: try: name = f'EDY/{var}/New_adaptive_{key_str}' oldEDY = np.array(file[name]) del file[f'EDY/{var}/New_adaptive_{key_str}'] data = np.vstack((oldEDY, all_outputs[var])) except KeyError: data = all_outputs[var] grpY.create_dataset(f'New_adaptive_{key_str}', data=data) # Print the collocation points whose simulations crashed if len(NaN_idx) != 0: print('\n') print('*'*20) print("\nThe following parametersets have been removed:\n", c_points[NaN_idx]) print("\n") print('*'*20) # Pass it to the attribute new_c_points = np.delete(c_points, NaN_idx, axis=0) self.OutputMatrix = all_outputs # Save CollocationPoints grpX = file.create_group("EDX") if not hdf5_exist else file.get("EDX") if prevRun_No == 0 and key_str == '': grpX.create_dataset("init_"+key_str, data=c_points) if len(NaN_idx) != 0: grpX.create_dataset("New_init_"+key_str, data=new_c_points) else: try: name = f'EDX/adaptive_{key_str}' oldCollocationPoints = np.array(file[name]) del file[f'EDX/adaptive_{key_str}'] data = np.vstack((oldCollocationPoints, new_c_points)) except KeyError: data = new_c_points grpX.create_dataset('adaptive_'+key_str, data=data) if len(NaN_idx) != 0: try: name = f'EDX/New_adaptive_{key_str}' oldCollocationPoints = np.array(file[name]) del file[f'EDX/New_adaptive_{key_str}'] data = np.vstack((oldCollocationPoints, new_c_points)) except KeyError: data = new_c_points grpX.create_dataset('New_adaptive_'+key_str, data=data) # Close h5py file file.close() return all_outputs, new_c_points # ------------------------------------------------------------------------- def zip_subdirs(self, dir_name, key): """ Zips all the files containing the key(word). Parameters ---------- dir_name : str Directory name. key : str Keyword to search for. Returns ------- None. """ # setup file paths variable dir_list = [] file_paths = [] # Read all directory, subdirectories and file lists dir_path = os.getcwd() for root, directories, files in os.walk(dir_path): for directory in directories: # Create the full filepath by using os module. if key in directory: folderPath = os.path.join(dir_path, directory) dir_list.append(folderPath) # Loop over the identified directories to store the file paths for direct_name in dir_list: for root, directories, files in os.walk(direct_name): for filename in files: # Create the full filepath by using os module. filePath = os.path.join(root, filename) file_paths.append('.'+filePath.split(dir_path)[1]) # writing files to a zipfile if len(file_paths) != 0: zip_file = zipfile.ZipFile(dir_name+'.zip', 'w') with zip_file: # writing each file one by one for file in file_paths: zip_file.write(file) file_paths = [path for path in os.listdir('.') if key in path] for path in file_paths: shutil.rmtree(path) print("\n") print(f'{dir_name}.zip file has been created successfully!\n') return
Class variables
var Output
Methods
def within_range(self, out, minout, maxout)
-
Expand source code
def within_range(self, out, minout, maxout): inside = False if (out > minout).all() and (out < maxout).all(): inside = True return inside
def read_observation(self, case='calib')
-
Reads/prepare the observation/measurement data for calibration.
Returns
DataFrame
- A dataframe with the calibration data.
Expand source code
def read_observation(self, case='calib'): """ Reads/prepare the observation/measurement data for calibration. Returns ------- DataFrame A dataframe with the calibration data. """ if case.lower() == 'calib': if isinstance(self.observations, dict) and bool(self.observations): obs = pd.DataFrame.from_dict(self.observations) elif self.meas_file is not None: file_path = os.path.join(os.getcwd(), self.meas_file) obs = pd.read_csv(file_path, delimiter=',') elif isinstance(self.observations, pd.DataFrame): obs = self.observations else: raise Exception("Please provide the observation data as a " "dictionary via observations attribute or pass" " the csv-file path to MeasurementFile " "attribute") elif case.lower() == 'valid': if isinstance(self.observations_valid, dict) and \ bool(self.observations_valid): obs = pd.DataFrame.from_dict(self.observations_valid) elif self.meas_file_valid is not None: file_path = os.path.join(os.getcwd(), self.meas_file_valid) obs = pd.read_csv(file_path, delimiter=',') elif isinstance(self.observations_valid, pd.DataFrame): obs = self.observations_valid else: raise Exception("Please provide the observation data as a " "dictionary via Observations attribute or pass" " the csv-file path to MeasurementFile " "attribute") # Compute the number of observation n_obs = obs[self.Output.names].notnull().sum().values.sum() if case.lower() == 'calib': self.observations = obs self.n_obs = n_obs return self.observations elif case.lower() == 'valid': self.observations_valid = obs self.n_obs_valid = n_obs return self.observations_valid
def read_mc_reference(self)
-
Is used, if a Monte-Carlo reference is available for further in-depth post-processing after meta-model training.
Returns
None
Expand source code
def read_mc_reference(self): """ Is used, if a Monte-Carlo reference is available for further in-depth post-processing after meta-model training. Returns ------- None """ if self.mc_ref_file is None and not hasattr(self, 'mc_reference'): return elif isinstance(self.mc_reference, dict) and bool(self.mc_reference): self.mc_reference = pd.DataFrame.from_dict(self.mc_reference) elif self.mc_ref_file is not None: file_path = os.path.join(os.getcwd(), self.mc_ref_file) self.mc_reference = pd.read_csv(file_path, delimiter=',') else: raise Exception("Please provide the MC reference data as a " "dictionary via mc_reference attribute or pass the" " csv-file path to mc_ref_file attribute") return self.mc_reference
def read_output(self)
-
Reads the the parser output file and returns it as an executable function. It is required when the models returns the simulation outputs in csv files.
Returns
Output
:func
- Output parser function.
Expand source code
def read_output(self): """ Reads the the parser output file and returns it as an executable function. It is required when the models returns the simulation outputs in csv files. Returns ------- Output : func Output parser function. """ output_func_name = self.Output.parser output_func = getattr(__import__(output_func_name), output_func_name) file_names = [] for File in self.Output.file_names: file_names.append(os.path.join(self.exe_path, File)) try: output = output_func(self.name, file_names) except TypeError: output = output_func(file_names) return output
def update_input_params(self, new_input_file, param_set)
-
Finds this pattern with
in the new_input_file and replace it with the new value from the array param_sets. Parameters
new_input_file
:list
- List of the input files with the adapted names.
param_set
:array
ofshape (n_params)
- Parameter set.
Returns
None.
Expand source code
def update_input_params(self, new_input_file, param_set): """ Finds this pattern with <X1> in the new_input_file and replace it with the new value from the array param_sets. Parameters ---------- new_input_file : list List of the input files with the adapted names. param_set : array of shape (n_params) Parameter set. Returns ------- None. """ NofPa = param_set.shape[0] text_to_search_list = [f'<X{i+1}>' for i in range(NofPa)] for filename in new_input_file: # Read in the file with open(filename, 'r') as file: filedata = file.read() # Replace the target string for text_to_search, params in zip(text_to_search_list, param_set): filedata = filedata.replace(text_to_search, f'{params:0.4e}') # Write the file out again with open(filename, 'w') as file: file.write(filedata)
def run_command(self, command, output_file_names)
-
Runs the execution command given by the user to run the given model. It checks if the output files have been generated. If yes, the jobe is done and it extracts and returns the requested output(s). Otherwise, it executes the command again.
Parameters
command
:str
- The shell command to be executed.
output_file_names
:list
- Name of the output file names.
Returns
simulation_outputs
:array
ofshape (n_obs, n_outputs)
- Simulation outputs.
Expand source code
def run_command(self, command, output_file_names): """ Runs the execution command given by the user to run the given model. It checks if the output files have been generated. If yes, the jobe is done and it extracts and returns the requested output(s). Otherwise, it executes the command again. Parameters ---------- command : str The shell command to be executed. output_file_names : list Name of the output file names. Returns ------- simulation_outputs : array of shape (n_obs, n_outputs) Simulation outputs. """ # Check if simulation is finished while True: time.sleep(3) files = os.listdir(".") if all(elem in files for elem in output_file_names): break else: # Run command Process = os.system(f'./../{command}') if Process != 0: print('\nMessage 1:') print(f'\tIf value of \'{Process}\' is a non-zero value, ' 'then compilation problems \n' % Process) os.chdir("..") # Read the output simulation_outputs = self.read_output() return simulation_outputs
def run_forwardmodel(self, xx)
-
This function creates subdirectory for the current run and copies the necessary files to this directory and renames them. Next, it executes the given command.
Parameters
xx
:tuple
- A tuple including parameter set, simulation number and key string.
Returns
output
:array
ofshape (n_outputs+1, n_obs)
- An array passed by the output paraser containing the x_values as the first row and the simulations results stored in the the rest of the array.
Expand source code
def run_forwardmodel(self, xx): """ This function creates subdirectory for the current run and copies the necessary files to this directory and renames them. Next, it executes the given command. Parameters ---------- xx : tuple A tuple including parameter set, simulation number and key string. Returns ------- output : array of shape (n_outputs+1, n_obs) An array passed by the output paraser containing the x_values as the first row and the simulations results stored in the the rest of the array. """ c_points, run_no, key_str = xx # Handle if only one imput file is provided if not isinstance(self.input_template, list): self.input_template = [self.input_template] if not isinstance(self.input_file, list): self.input_file = [self.input_file] new_input_file = [] # Loop over the InputTemplates: for in_temp in self.input_template: if '/' in in_temp: in_temp = in_temp.split('/')[-1] new_input_file.append(in_temp.split('.tpl')[0] + key_str + f"_{run_no+1}" + in_temp.split('.tpl')[1]) # Create directories newpath = self.name + key_str + f'_{run_no+1}' if not os.path.exists(newpath): os.makedirs(newpath) # Copy the necessary files to the directories for in_temp in self.input_template: # Input file(s) of the model shutil.copy2(in_temp, newpath) # Auxiliary file if self.aux_file is not None: shutil.copy2(self.aux_file, newpath) # Auxiliary file # Rename the Inputfile and/or auxiliary file os.chdir(newpath) for input_tem, input_file in zip(self.input_template, new_input_file): if '/' in input_tem: input_tem = input_tem.split('/')[-1] os.rename(input_tem, input_file) # Update the parametrs in Input file self.update_input_params(new_input_file, c_points) # Update the user defined command and the execution path try: new_command = self.shell_command.replace(self.input_file[0], new_input_file[0]) new_command = new_command.replace(self.input_file[1], new_input_file[1]) except: new_command = self.shell_command.replace(self.input_file[0], new_input_file[0]) # Set the exe path if not provided if not bool(self.exe_path): self.exe_path = os.getcwd() # Run the model output = self.run_command(new_command, self.Output.file_names) return output
def run_model_parallel(self, c_points, prevRun_No=0, key_str='', mp=True)
-
Runs model simulations. If mp is true (default), then the simulations are started in parallel.
Parameters
c_points
:array
ofshape (n_samples, n_params)
- Collocation points (training set).
prevRun_No
:int
, optional- Previous run number, in case the sequential design is selected.
The default is
0
. key_str
:str
, optional- A descriptive string for validation runs. The default is
''
. mp
:bool
, optional- Multiprocessing. The default is
True
.
Returns
all_outputs
:dict
- A dictionary with x values (time step or point id) and all outputs.
Each key contains an array of the shape
(n_samples, n_obs)
. new_c_points
:array
- Updated collocation points (training set). If a simulation does not executed successfully, the parameter set is removed.
Expand source code
def run_model_parallel(self, c_points, prevRun_No=0, key_str='', mp=True): """ Runs model simulations. If mp is true (default), then the simulations are started in parallel. Parameters ---------- c_points : array of shape (n_samples, n_params) Collocation points (training set). prevRun_No : int, optional Previous run number, in case the sequential design is selected. The default is `0`. key_str : str, optional A descriptive string for validation runs. The default is `''`. mp : bool, optional Multiprocessing. The default is `True`. Returns ------- all_outputs : dict A dictionary with x values (time step or point id) and all outputs. Each key contains an array of the shape `(n_samples, n_obs)`. new_c_points : array Updated collocation points (training set). If a simulation does not executed successfully, the parameter set is removed. """ # Create hdf5 metadata hdf5file = f'ExpDesign_{self.name}.hdf5' hdf5_exist = os.path.exists(hdf5file) file = h5py.File(hdf5file, 'a') # Initilization n_c_points = len(c_points) self.n_outputs = len(self.Output.names) all_outputs = {} # Extract the function if self.link_type.lower() == 'function': # Prepare the function Function = getattr(__import__(self.py_file), self.py_file) # --------------------------------------------------------------- # -------------- Multiprocessing with Pool Class ---------------- # --------------------------------------------------------------- # Start a pool with the number of CPUs if self.n_cpus is None: n_cpus = multiprocessing.cpu_count() else: n_cpus = self.n_cpus # Run forward model either normal or with multiprocessing if not self.multi_process: group_results = list([self.run_forwardmodel((c_points, prevRun_No, key_str))]) else: with multiprocessing.Pool(n_cpus) as p: desc = f'Running forward model {key_str}' if self.link_type.lower() == 'function': imap_var = p.imap(Function, c_points[:, np.newaxis]) else: args = zip(c_points, [prevRun_No+i for i in range(n_c_points)], [key_str]*n_c_points) imap_var = p.imap(self.run_forwardmodel, args) group_results = list(tqdm.tqdm(imap_var, total=n_c_points, desc=desc)) # Save time steps or x-values x_values = group_results[0][0] all_outputs["x_values"] = x_values if not hdf5_exist: if type(x_values) is dict: grp_x_values = file.create_group("x_values/") for varIdx, var in enumerate(self.Output.names): grp_x_values.create_dataset(var, data=x_values[var]) else: file.create_dataset("x_values", data=x_values) # save each output in their corresponding array NaN_idx = [] for varIdx, var in enumerate(self.Output.names): if not hdf5_exist: grpY = file.create_group("EDY/"+var) else: grpY = file.get("EDY/"+var) Outputs = np.asarray([item[varIdx+1] for item in group_results], dtype=np.float64) if prevRun_No == 0 and key_str == '': grpY.create_dataset(f'init_{key_str}', data=Outputs) else: try: oldEDY = np.array(file[f'EDY/{var}/adaptive_{key_str}']) del file[f'EDY/{var}/adaptive_{key_str}'] data = np.vstack((oldEDY, Outputs)) except KeyError: data = Outputs grpY.create_dataset('adaptive_'+key_str, data=data) NaN_idx = np.unique(np.argwhere(np.isnan(Outputs))[:, 0]) all_outputs[var] = np.delete(Outputs, NaN_idx, axis=0) if prevRun_No == 0 and key_str == '': grpY.create_dataset(f"New_init_{key_str}", data=all_outputs[var]) else: try: name = f'EDY/{var}/New_adaptive_{key_str}' oldEDY = np.array(file[name]) del file[f'EDY/{var}/New_adaptive_{key_str}'] data = np.vstack((oldEDY, all_outputs[var])) except KeyError: data = all_outputs[var] grpY.create_dataset(f'New_adaptive_{key_str}', data=data) # Print the collocation points whose simulations crashed if len(NaN_idx) != 0: print('\n') print('*'*20) print("\nThe following parametersets have been removed:\n", c_points[NaN_idx]) print("\n") print('*'*20) # Pass it to the attribute new_c_points = np.delete(c_points, NaN_idx, axis=0) self.OutputMatrix = all_outputs # Save CollocationPoints grpX = file.create_group("EDX") if not hdf5_exist else file.get("EDX") if prevRun_No == 0 and key_str == '': grpX.create_dataset("init_"+key_str, data=c_points) if len(NaN_idx) != 0: grpX.create_dataset("New_init_"+key_str, data=new_c_points) else: try: name = f'EDX/adaptive_{key_str}' oldCollocationPoints = np.array(file[name]) del file[f'EDX/adaptive_{key_str}'] data = np.vstack((oldCollocationPoints, new_c_points)) except KeyError: data = new_c_points grpX.create_dataset('adaptive_'+key_str, data=data) if len(NaN_idx) != 0: try: name = f'EDX/New_adaptive_{key_str}' oldCollocationPoints = np.array(file[name]) del file[f'EDX/New_adaptive_{key_str}'] data = np.vstack((oldCollocationPoints, new_c_points)) except KeyError: data = new_c_points grpX.create_dataset('New_adaptive_'+key_str, data=data) # Close h5py file file.close() return all_outputs, new_c_points
def zip_subdirs(self, dir_name, key)
-
Zips all the files containing the key(word).
Parameters
dir_name
:str
- Directory name.
key
:str
- Keyword to search for.
Returns
None.
Expand source code
def zip_subdirs(self, dir_name, key): """ Zips all the files containing the key(word). Parameters ---------- dir_name : str Directory name. key : str Keyword to search for. Returns ------- None. """ # setup file paths variable dir_list = [] file_paths = [] # Read all directory, subdirectories and file lists dir_path = os.getcwd() for root, directories, files in os.walk(dir_path): for directory in directories: # Create the full filepath by using os module. if key in directory: folderPath = os.path.join(dir_path, directory) dir_list.append(folderPath) # Loop over the identified directories to store the file paths for direct_name in dir_list: for root, directories, files in os.walk(direct_name): for filename in files: # Create the full filepath by using os module. filePath = os.path.join(root, filename) file_paths.append('.'+filePath.split(dir_path)[1]) # writing files to a zipfile if len(file_paths) != 0: zip_file = zipfile.ZipFile(dir_name+'.zip', 'w') with zip_file: # writing each file one by one for file in file_paths: zip_file.write(file) file_paths = [path for path in os.listdir('.') if key in path] for path in file_paths: shutil.rmtree(path) print("\n") print(f'{dir_name}.zip file has been created successfully!\n') return
class Input
-
A class to define the uncertain input parameters.
Attributes
Marginals
:obj
- Marginal objects. See
inputs.Marginal
. Rosenblatt
:bool
- If Rossenblatt transformation is required for the dependent input parameters.
Examples
Marginals can be defined as following:
>>> Inputs.add_marginals() >>> Inputs.Marginals[0].name = 'X_1' >>> Inputs.Marginals[0].dist_type = 'uniform' >>> Inputs.Marginals[0].parameters = [-5, 5]
If there is no common data is avaliable, the input data can be given as following:
>>> Inputs.add_marginals() >>> Inputs.Marginals[0].name = 'X_1' >>> Inputs.Marginals[0].input_data = input_data
Expand source code
class Input: """ A class to define the uncertain input parameters. Attributes ---------- Marginals : obj Marginal objects. See `inputs.Marginal`. Rosenblatt : bool If Rossenblatt transformation is required for the dependent input parameters. Examples ------- Marginals can be defined as following: >>> Inputs.add_marginals() >>> Inputs.Marginals[0].name = 'X_1' >>> Inputs.Marginals[0].dist_type = 'uniform' >>> Inputs.Marginals[0].parameters = [-5, 5] If there is no common data is avaliable, the input data can be given as following: >>> Inputs.add_marginals() >>> Inputs.Marginals[0].name = 'X_1' >>> Inputs.Marginals[0].input_data = input_data """ poly_coeffs_flag = True def __init__(self): self.Marginals = [] self.Rosenblatt = False def add_marginals(self): """ Adds a new Marginal object to the input object. Returns ------- None. """ self.Marginals.append(Marginal())
Class variables
var poly_coeffs_flag
Methods
def add_marginals(self)
-
Adds a new Marginal object to the input object.
Returns
None.
Expand source code
def add_marginals(self): """ Adds a new Marginal object to the input object. Returns ------- None. """ self.Marginals.append(Marginal())
class Discrepancy (InputDisc='', disc_type='Gaussian', parameters=None)
-
Discrepancy class for Bayesian inference method. We define the reference or reality to be equal to what we can model and a descripancy term \epsilon . We consider the followin format:
\textbf{y}_{\text{reality}} = \mathcal{M}(\theta) + \epsilon,
where \epsilon \in R^{N_{out}} represents the the effects of measurement error and model inaccuracy. For simplicity, it can be defined as an additive Gaussian disrepancy with zeromean and given covariance matrix \Sigma :
\epsilon \sim \mathcal{N}(\epsilon|0, \Sigma).
In the context of model inversion or calibration, an observation point \textbf{y}_i \in \mathcal{y} is a realization of a Gaussian distribution with mean value of \mathcal{M}(\theta) and covariance matrix of \Sigma .
p(\textbf{y}|\theta) = \mathcal{N}(\textbf{y}|\mathcal{M} (\theta))
The following options are available:
-
Option A: With known redidual covariance matrix \Sigma for independent measurements.
-
Option B: With unknown redidual covariance matrix \Sigma, paramethrized as \Sigma(\theta_{\epsilon})=\sigma^2 \textbf{I}_ {N_{out}} with unknown residual variances \sigma^2. This term will be jointly infer with the uncertain input parameters. For the inversion, you need to define a prior marginal via
Input
class. Note that \sigma^2 is only a single scalar multiplier for the diagonal entries of the covariance matrix \Sigma.
Attributes
InputDisc
:obj
- Input object. When the \sigma^2 is expected to be inferred
jointly with the parameters (
Option B
).If multiple output groups are defined byModel.Output.names
, each model output needs to have. a prior marginal using theInput
class. The default is''
. disc_type
:str
- Type of the noise definition.
'Gaussian'
is only supported so far. parameters
:dict
orpandas.DataFrame
- Known residual variance \sigma^2, i.e. diagonal entry of the
covariance matrix of the multivariate normal likelihood in case of
Option A
.
Expand source code
class Discrepancy: """ Discrepancy class for Bayesian inference method. We define the reference or reality to be equal to what we can model and a descripancy term \\( \\epsilon \\). We consider the followin format: $$\\textbf{y}_{\\text{reality}} = \\mathcal{M}(\\theta) + \\epsilon,$$ where \\( \\epsilon \\in R^{N_{out}} \\) represents the the effects of measurement error and model inaccuracy. For simplicity, it can be defined as an additive Gaussian disrepancy with zeromean and given covariance matrix \\( \\Sigma \\): $$\\epsilon \\sim \\mathcal{N}(\\epsilon|0, \\Sigma). $$ In the context of model inversion or calibration, an observation point \\( \\textbf{y}_i \\in \\mathcal{y} \\) is a realization of a Gaussian distribution with mean value of \\(\\mathcal{M}(\\theta) \\) and covariance matrix of \\( \\Sigma \\). $$ p(\\textbf{y}|\\theta) = \\mathcal{N}(\\textbf{y}|\\mathcal{M} (\\theta))$$ The following options are available: * Option A: With known redidual covariance matrix \\(\\Sigma\\) for independent measurements. * Option B: With unknown redidual covariance matrix \\(\\Sigma\\), paramethrized as \\(\\Sigma(\\theta_{\\epsilon})=\\sigma^2 \\textbf{I}_ {N_{out}}\\) with unknown residual variances \\(\\sigma^2\\). This term will be jointly infer with the uncertain input parameters. For the inversion, you need to define a prior marginal via `Input` class. Note that \\(\\sigma^2\\) is only a single scalar multiplier for the diagonal entries of the covariance matrix \\(\\Sigma\\). Attributes ---------- InputDisc : obj Input object. When the \\(\\sigma^2\\) is expected to be inferred jointly with the parameters (`Option B`).If multiple output groups are defined by `Model.Output.names`, each model output needs to have. a prior marginal using the `Input` class. The default is `''`. disc_type : str Type of the noise definition. `'Gaussian'` is only supported so far. parameters : dict or pandas.DataFrame Known residual variance \\(\\sigma^2\\), i.e. diagonal entry of the covariance matrix of the multivariate normal likelihood in case of `Option A`. """ def __init__(self, InputDisc='', disc_type='Gaussian', parameters=None): self.InputDisc = InputDisc self.disc_type = disc_type self.parameters = parameters # ------------------------------------------------------------------------- def get_sample(self, n_samples): """ Generate samples for the \\(\\sigma^2\\), i.e. the diagonal entries of the variance-covariance matrix in the multivariate normal distribution. Parameters ---------- n_samples : int Number of samples (parameter sets). Returns ------- sigma2_prior: array of shape (n_samples, n_params) \\(\\sigma^2\\) samples. """ self.n_samples = n_samples ExpDesign = ExpDesigns(self.InputDisc) self.sigma2_prior = ExpDesign.generate_ED( n_samples, sampling_method='random', max_pce_deg=1 ) # Store BoundTuples self.ExpDesign = ExpDesign # Naive approach: Fit a gaussian kernel to the provided data self.ExpDesign.JDist = stats.gaussian_kde(ExpDesign.raw_data) # Save the names of sigmas if len(self.InputDisc.Marginals) != 0: self.name = [] for Marginalidx in range(len(self.InputDisc.Marginals)): self.name.append(self.InputDisc.Marginals[Marginalidx].name) return self.sigma2_prior
Methods
def get_sample(self, n_samples)
-
Generate samples for the \sigma^2, i.e. the diagonal entries of the variance-covariance matrix in the multivariate normal distribution.
Parameters
n_samples
:int
- Number of samples (parameter sets).
Returns
sigma2_prior
:array
ofshape (n_samples, n_params)
- \sigma^2 samples.
Expand source code
def get_sample(self, n_samples): """ Generate samples for the \\(\\sigma^2\\), i.e. the diagonal entries of the variance-covariance matrix in the multivariate normal distribution. Parameters ---------- n_samples : int Number of samples (parameter sets). Returns ------- sigma2_prior: array of shape (n_samples, n_params) \\(\\sigma^2\\) samples. """ self.n_samples = n_samples ExpDesign = ExpDesigns(self.InputDisc) self.sigma2_prior = ExpDesign.generate_ED( n_samples, sampling_method='random', max_pce_deg=1 ) # Store BoundTuples self.ExpDesign = ExpDesign # Naive approach: Fit a gaussian kernel to the provided data self.ExpDesign.JDist = stats.gaussian_kde(ExpDesign.raw_data) # Save the names of sigmas if len(self.InputDisc.Marginals) != 0: self.name = [] for Marginalidx in range(len(self.InputDisc.Marginals)): self.name.append(self.InputDisc.Marginals[Marginalidx].name) return self.sigma2_prior
-
class MetaModel (input_obj, meta_model_type='PCE', pce_reg_method='OLS', pce_deg=1, pce_q_norm=1.0, dim_red_method='no', verbose=False)
-
Meta (surrogate) model
This class trains a surrogate model. It accepts an input object (input_obj) containing the specification of the distributions for uncertain parameters and a model object with instructions on how to run the computational model.
Attributes
input_obj
:obj
- Input object with the information on the model input parameters.
meta_model_type
:str
- Surrogate model types. Three surrogate model types are supported:
polynomial chaos expansion (
PCE
), arbitrary PCE (aPCE
) and Gaussian process regression (GPE
). Default is PCE. pce_reg_method
:str
-
PCE regression method to compute the coefficients. The following regression methods are available:
- OLS: Ordinary Least Square method
- BRR: Bayesian Ridge Regression
- LARS: Least angle regression
- ARD: Bayesian ARD Regression
- FastARD: Fast Bayesian ARD Regression
- VBL: Variational Bayesian Learning
- EBL: Emperical Bayesian Learning
Default is
OLS
.
pce_deg
:int
orlist
ofint
- Polynomial degree(s). If a list is given, an adaptive algorithm is used
to find the best degree with the lowest Leave-One-Out cross-validation
(LOO) error (or the highest score=1-LOO). Default is
1
. pce_q_norm
:float
- Hyperbolic (or q-norm) truncation for multi-indices of multivariate
polynomials. Default is
1.0
. dim_red_method
:str
- Dimensionality reduction method for the output space. The available
method is based on principal component analysis (PCA). The Default is
'no'
. There are two ways to select number of components: use percentage of the explainable variance threshold (between 0 and 100) (Option A) or direct prescription of components' number (Option B):>>> MetaModelOpts.dim_red_method = 'PCA' >>> MetaModelOpts.var_pca_threshold = 99.999 # Option A >>> MetaModelOpts.n_pca_components = 12 # Option B
verbose
:bool
- Prints summary of the regression results. Default is
False
.
Note
To define the sampling methods and the training set, an experimental design instance shall be defined. This can be done by:
>>> MetaModelOpts.add_ExpDesign()
Two experimental design schemes are supported: one-shot (
normal
) and adaptive sequential (sequential
) designs. For experimental design refer toExpDesigns
.Expand source code
class MetaModel: """ Meta (surrogate) model This class trains a surrogate model. It accepts an input object (input_obj) containing the specification of the distributions for uncertain parameters and a model object with instructions on how to run the computational model. Attributes ---------- input_obj : obj Input object with the information on the model input parameters. meta_model_type : str Surrogate model types. Three surrogate model types are supported: polynomial chaos expansion (`PCE`), arbitrary PCE (`aPCE`) and Gaussian process regression (`GPE`). Default is PCE. pce_reg_method : str PCE regression method to compute the coefficients. The following regression methods are available: 1. OLS: Ordinary Least Square method 2. BRR: Bayesian Ridge Regression 3. LARS: Least angle regression 4. ARD: Bayesian ARD Regression 5. FastARD: Fast Bayesian ARD Regression 6. VBL: Variational Bayesian Learning 7. EBL: Emperical Bayesian Learning Default is `OLS`. pce_deg : int or list of int Polynomial degree(s). If a list is given, an adaptive algorithm is used to find the best degree with the lowest Leave-One-Out cross-validation (LOO) error (or the highest score=1-LOO). Default is `1`. pce_q_norm : float Hyperbolic (or q-norm) truncation for multi-indices of multivariate polynomials. Default is `1.0`. dim_red_method : str Dimensionality reduction method for the output space. The available method is based on principal component analysis (PCA). The Default is `'no'`. There are two ways to select number of components: use percentage of the explainable variance threshold (between 0 and 100) (Option A) or direct prescription of components' number (Option B): >>> MetaModelOpts.dim_red_method = 'PCA' >>> MetaModelOpts.var_pca_threshold = 99.999 # Option A >>> MetaModelOpts.n_pca_components = 12 # Option B verbose : bool Prints summary of the regression results. Default is `False`. Note ------- To define the sampling methods and the training set, an experimental design instance shall be defined. This can be done by: >>> MetaModelOpts.add_ExpDesign() Two experimental design schemes are supported: one-shot (`normal`) and adaptive sequential (`sequential`) designs. For experimental design refer to `ExpDesigns`. """ def __init__(self, input_obj, meta_model_type='PCE', pce_reg_method='OLS', pce_deg=1, pce_q_norm=1.0, dim_red_method='no', verbose=False): self.input_obj = input_obj self.meta_model_type = meta_model_type self.pce_reg_method = pce_reg_method self.pce_deg = pce_deg self.pce_q_norm = pce_q_norm self.dim_red_method = dim_red_method self.verbose = False # ------------------------------------------------------------------------- def create_metamodel(self, Model): """ Starts the training of the meta-model for the model objects containg the given computational model. Parameters ---------- Model : obj Model object. Returns ------- metamodel : obj The meta model object. """ self.ModelObj = Model self.n_params = len(self.input_obj.Marginals) self.ExpDesignFlag = 'normal' # --- Prepare pce degree --- if self.meta_model_type.lower() == 'pce': if type(self.pce_deg) is not np.ndarray: self.pce_deg = np.array(self.pce_deg) if self.ExpDesign.method == 'sequential': from .sequential_design import SeqDesign seq_design = SeqDesign(self) metamodel = seq_design.train_seq_design(Model) elif self.ExpDesign.method == 'normal': self.ExpDesignFlag = 'normal' metamodel = self.train_norm_design(Model) else: raise Exception("The method for experimental design you requested" " has not been implemented yet.") # Zip the model run directories if self.ModelObj.link_type.lower() == 'pylink': Model.zip_subdirs(Model.name, f'{Model.name}_') return metamodel # ------------------------------------------------------------------------- def train_norm_design(self, Model, verbose=False): """ This function loops over the outputs and each time step/point and fits the meta model. Parameters ---------- Model : obj Model object. verbose : bool, optional Flag for a sequential design in silent mode. The default is False. Returns ------- self: obj Meta-model object. """ # Get the collocation points to run the forward model CollocationPoints, OutputDict = self.generate_ExpDesign(Model) # Initialize the nested dictionaries self.deg_dict = self.auto_vivification() self.q_norm_dict = self.auto_vivification() self.coeffs_dict = self.auto_vivification() self.basis_dict = self.auto_vivification() self.score_dict = self.auto_vivification() self.clf_poly = self.auto_vivification() self.gp_poly = self.auto_vivification() self.pca = self.auto_vivification() self.LCerror = self.auto_vivification() self.x_scaler = {} # Define the DegreeArray nSamples, ndim = CollocationPoints.shape self.DegreeArray = self.__select_degree(ndim, nSamples) # Generate all basis indices self.allBasisIndices = self.auto_vivification() for deg in self.DegreeArray: keys = self.allBasisIndices.keys() if deg not in np.fromiter(keys, dtype=float): # Generate the polynomial basis indices for qidx, q in enumerate(self.pce_q_norm): basis_indices = self.create_basis_indices(degree=deg, q_norm=q) self.allBasisIndices[str(deg)][str(q)] = basis_indices # Evaluate the univariate polynomials on ExpDesign if self.meta_model_type.lower() != 'gpe': self.univ_p_val = self.univ_basis_vals(CollocationPoints) if 'x_values' in OutputDict: self.ExpDesign.x_values = OutputDict['x_values'] del OutputDict['x_values'] # --- Loop through data points and fit the surrogate --- if not verbose: print(f"\n>>>> Training the {self.meta_model_type} metamodel " "started. <<<<<<\n") items = tqdm(OutputDict.items(), desc="Fitting regression") else: items = OutputDict.items() # For loop over the components/outputs for key, Output in items: # Dimensionality reduction with PCA, if specified if self.dim_red_method.lower() == 'pca': self.pca[key], target = self.pca_transformation(Output) else: target = Output # Parallel fit regression if self.meta_model_type.lower() == 'gpe': # Prepare the input matrix scaler = MinMaxScaler() X_S = scaler.fit_transform(CollocationPoints) self.x_scaler[key] = scaler out = Parallel(n_jobs=-1, prefer='threads')( delayed(self.gaussian_process_emulator)(X_S, target[:, idx]) for idx in range(target.shape[1])) for idx in range(target.shape[1]): self.gp_poly[key][f"y_{idx+1}"] = out[idx] else: out = Parallel(n_jobs=-1, prefer='threads')( delayed(self.adaptive_regression)(CollocationPoints, target[:, idx], idx) for idx in range(target.shape[1])) for i in range(target.shape[1]): # Create a dict to pass the variables self.deg_dict[key][f"y_{i+1}"] = out[i]['degree'] self.q_norm_dict[key][f"y_{i+1}"] = out[i]['qnorm'] self.coeffs_dict[key][f"y_{i+1}"] = out[i]['coeffs'] self.basis_dict[key][f"y_{i+1}"] = out[i]['multi_indices'] self.score_dict[key][f"y_{i+1}"] = out[i]['LOOCVScore'] self.clf_poly[key][f"y_{i+1}"] = out[i]['clf_poly'] self.LCerror[key][f"y_{i+1}"] = out[i]['LCerror'] if not verbose: print(f"\n>>>> Training the {self.meta_model_type} metamodel" " sucessfully completed. <<<<<<\n") return self # ------------------------------------------------------------------------- def create_basis_indices(self, degree, q_norm): """ Creates set of selected multi-indices of multivariate polynomials for certain parameter numbers, polynomial degree, hyperbolic (or q-norm) truncation scheme. Parameters ---------- degree : int Polynomial degree. q_norm : float hyperbolic (or q-norm) truncation. Returns ------- basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. """ basis_indices = glexindex(start=0, stop=degree+1, dimensions=self.n_params, cross_truncation=q_norm, reverse=False, graded=True) return basis_indices # ------------------------------------------------------------------------- def add_ExpDesign(self): """ Instanciates experimental design object. Returns ------- None. """ self.ExpDesign = ExpDesigns(self.input_obj, meta_Model=self.meta_model_type) # ------------------------------------------------------------------------- def generate_ExpDesign(self, Model): """ Prepares the experimental design either by reading from the prescribed data or running simulations. Parameters ---------- Model : obj Model object. Raises ------ Exception If model sumulations are not provided properly. Returns ------- ED_X_tr: array of shape (n_samples, n_params) Training samples transformed by an isoprobabilistic transformation. ED_Y: dict Model simulations (target) for all outputs. """ ExpDesign = self.ExpDesign if self.ExpDesignFlag != 'sequential': # Read ExpDesign (training and targets) from the provided hdf5 if ExpDesign.hdf5_file is not None: # Read hdf5 file f = h5py.File(ExpDesign.hdf5_file, 'r+') # Read EDX and pass it to ExpDesign object try: ExpDesign.X = np.array(f["EDX/New_init_"]) except KeyError: ExpDesign.X = np.array(f["EDX/init_"]) # Update number of initial samples ExpDesign.n_init_samples = ExpDesign.X.shape[0] # Read EDX and pass it to ExpDesign object out_names = self.ModelObj.Output.names ExpDesign.Y = {} # Extract x values try: ExpDesign.Y["x_values"] = dict() for varIdx, var in enumerate(out_names): x = np.array(f[f"x_values/{var}"]) ExpDesign.Y["x_values"][var] = x except KeyError: ExpDesign.Y["x_values"] = np.array(f["x_values"]) # Store the output for varIdx, var in enumerate(out_names): try: y = np.array(f[f"EDY/{var}/New_init_"]) except KeyError: y = np.array(f[f"EDY/{var}/init_"]) ExpDesign.Y[var] = y f.close() else: # Check if an old hdf5 file exists: if yes, rename it hdf5file = f'ExpDesign_{self.ModelObj.name}.hdf5' if os.path.exists(hdf5file): os.rename(hdf5file, 'old_'+hdf5file) # ---- Prepare X samples ---- ED_X, ED_X_tr = ExpDesign.generate_ED(ExpDesign.n_init_samples, ExpDesign.sampling_method, transform=True, max_pce_deg=np.max(self.pce_deg)) ExpDesign.X = ED_X ExpDesign.collocationPoints = ED_X_tr self.bound_tuples = ExpDesign.bound_tuples # ---- Run simulations at X ---- if not hasattr(ExpDesign, 'Y') or ExpDesign.Y is None: print('\n Now the forward model needs to be run!\n') ED_Y, up_ED_X = Model.run_model_parallel(ED_X) ExpDesign.X = up_ED_X self.ModelOutputDict = ED_Y ExpDesign.Y = ED_Y else: # Check if a dict has been passed. if type(ExpDesign.Y) is dict: self.ModelOutputDict = ExpDesign.Y else: raise Exception('Please provide either a dictionary or a hdf5' 'file to ExpDesign.hdf5_file argument.') return ED_X_tr, self.ModelOutputDict # ------------------------------------------------------------------------- def univ_basis_vals(self, samples, n_max=None): """ Evaluates univariate regressors along input directions. Parameters ---------- samples : array of shape (n_samples, n_params) Samples. n_max : int, optional Maximum polynomial degree. The default is `None`. Returns ------- univ_basis: array of shape (n_samples, n_params, n_max+1) All univariate regressors up to n_max. """ # Extract information poly_types = self.ExpDesign.poly_types if samples.ndim != 2: samples = samples.reshape(1, len(samples)) n_max = np.max(self.pce_deg) if n_max is None else n_max # Extract poly coeffs if self.ExpDesign.input_data_given or self.ExpDesign.apce: apolycoeffs = self.ExpDesign.polycoeffs else: apolycoeffs = None # Evaluate univariate basis univ_basis = eval_univ_basis(samples, n_max, poly_types, apolycoeffs) return univ_basis # ------------------------------------------------------------------------- def create_psi(self, basis_indices, univ_p_val): """ This function assemble the design matrix Psi from the given basis index set INDICES and the univariate polynomial evaluations univ_p_val. Parameters ---------- basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. univ_p_val : array of (n_samples, n_params, n_max+1) All univariate regressors up to `n_max`. Raises ------ ValueError n_terms in arguments do not match. Returns ------- psi : array of shape (n_samples, n_terms) Multivariate regressors. """ # Check if BasisIndices is a sparse matrix sparsity = sp.sparse.issparse(basis_indices) if sparsity: basis_indices = basis_indices.toarray() # Initialization and consistency checks # number of input variables n_params = univ_p_val.shape[1] # Size of the experimental design n_samples = univ_p_val.shape[0] # number of basis terms n_terms = basis_indices.shape[0] # check that the variables have consistent sizes if n_params != basis_indices.shape[1]: raise ValueError("The shapes of basis_indices and univ_p_val don't" " match!!") # Preallocate the Psi matrix for performance psi = np.ones((n_samples, n_terms)) # Assemble the Psi matrix for m in range(basis_indices.shape[1]): aa = np.where(basis_indices[:, m] > 0)[0] try: basisIdx = basis_indices[aa, m] bb = np.reshape(univ_p_val[:, m, basisIdx], psi[:, aa].shape) psi[:, aa] = np.multiply(psi[:, aa], bb) except ValueError as err: raise err return psi # ------------------------------------------------------------------------- def fit(self, X, y, basis_indices, reg_method=None): """ Fit regression using the regression method provided. Parameters ---------- X : array of shape (n_samples, n_features) Training vector, where n_samples is the number of samples and n_features is the number of features. y : array of shape (n_samples,) Target values. basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. reg_method : str, optional DESCRIPTION. The default is None. Returns ------- return_out_dict : Dict Fitted estimator, spareMulti-Index, sparseX and coefficients. """ if reg_method is None: reg_method = self.pce_reg_method # Check if BasisIndices is a sparse matrix sparsity = sp.sparse.issparse(basis_indices) clf_poly = [] compute_score = True if self.verbose else False # inverse of the observed variance of the data if np.var(y) != 0: Lambda = 1 / np.var(y) else: Lambda = 1e-6 # Bayes sparse adaptive aPCE if reg_method.lower() != 'ols': if reg_method.lower() == 'brr' or np.var(y) == 0: clf_poly = lm.BayesianRidge(n_iter=1000, tol=1e-7, fit_intercept=True, normalize=True, compute_score=compute_score, alpha_1=1e-04, alpha_2=1e-04, lambda_1=Lambda, lambda_2=Lambda) clf_poly.converged = True elif reg_method.lower() == 'ard': clf_poly = lm.ARDRegression(fit_intercept=True, normalize=True, compute_score=compute_score, n_iter=1000, tol=0.0001, alpha_1=1e-3, alpha_2=1e-3, lambda_1=Lambda, lambda_2=Lambda) elif reg_method.lower() == 'fastard': clf_poly = RegressionFastARD(fit_intercept=True, normalize=True, compute_score=compute_score, n_iter=300, tol=1e-10) elif reg_method.lower() == 'bcs': clf_poly = RegressionFastLaplace(fit_intercept=False, n_iter=1000, tol=1e-7) elif reg_method.lower() == 'lars': clf_poly = lm.LassoLarsCV(fit_intercept=False) elif reg_method.lower() == 'sgdr': clf_poly = lm.SGDRegressor(fit_intercept=False, max_iter=5000, tol=1e-7) elif reg_method.lower() == 'omp': clf_poly = lm.OrthogonalMatchingPursuitCV(fit_intercept=False, max_iter=10) elif reg_method.lower() == 'vbl': clf_poly = VBLinearRegression(fit_intercept=False) elif reg_method.lower() == 'ebl': clf_poly = EBLinearRegression(optimizer='em') # Fit clf_poly.fit(X, y) # Select the nonzero entries of coefficients # The first column must be kept (For mean calculations) nnz_idx = np.nonzero(clf_poly.coef_)[0] if len(nnz_idx) == 0 or nnz_idx[0] != 0: nnz_idx = np.insert(np.nonzero(clf_poly.coef_)[0], 0, 0) # Remove the zero entries for Bases and PSI if need be if sparsity: sparse_basis_indices = basis_indices.toarray()[nnz_idx] else: sparse_basis_indices = basis_indices[nnz_idx] sparse_X = X[:, nnz_idx] # Store the coefficients of the regression model clf_poly.fit(sparse_X, y) coeffs = clf_poly.coef_ else: # This is for the case where all outputs are zero, thereby # all coefficients are zero if sparsity: sparse_basis_indices = basis_indices.toarray() else: sparse_basis_indices = basis_indices sparse_X = X coeffs = clf_poly.coef_ # Ordinary least square method (OLS) else: if sparsity: sparse_basis_indices = basis_indices.toarray() else: sparse_basis_indices = basis_indices sparse_X = X X_T_X = np.dot(sparse_X.T, sparse_X) if np.linalg.cond(X_T_X) > 1e-12 and \ np.linalg.cond(X_T_X) < 1 / sys.float_info.epsilon: # faster coeffs = sp.linalg.solve(X_T_X, np.dot(sparse_X.T, y)) else: # stabler coeffs = np.dot(np.dot(np.linalg.pinv(X_T_X), sparse_X.T), y) # Create a dict to pass the outputs return_out_dict = dict() return_out_dict['clf_poly'] = clf_poly return_out_dict['spareMulti-Index'] = sparse_basis_indices return_out_dict['sparePsi'] = sparse_X return_out_dict['coeffs'] = coeffs return return_out_dict # -------------------------------------------------------------------------------------------------------- def adaptive_regression(self, ED_X, ED_Y, varIdx, verbose=False): """ Adaptively fits the PCE model by comparing the scores of different degrees and q-norm. Parameters ---------- ED_X : array of shape (n_samples, n_params) Experimental design. ED_Y : array of shape (n_samples,) Target values, i.e. simulation results for the Experimental design. varIdx : int Index of the output. verbose : bool, optional Print out summary. The default is False. Returns ------- returnVars : Dict Fitted estimator, best degree, best q-norm, LOOCVScore and coefficients. """ NrSamples, n_params = ED_X.shape # Initialization qAllCoeffs, AllCoeffs = {}, {} qAllIndices_Sparse, AllIndices_Sparse = {}, {} qAllclf_poly, Allclf_poly = {}, {} qAllnTerms, AllnTerms = {}, {} qAllLCerror, AllLCerror = {}, {} # Extract degree array and qnorm array DegreeArray = np.array([*self.allBasisIndices], dtype=int) qnorm = [*self.allBasisIndices[str(int(DegreeArray[0]))]] # Some options for EarlyStop errorIncreases = False # Stop degree, if LOO error does not decrease n_checks_degree times n_checks_degree = 3 # Stop qNorm, if criterion isn't fulfilled n_checks_qNorm times n_checks_qNorm = 2 nqnorms = len(qnorm) qNormEarlyStop = True if nqnorms < n_checks_qNorm+1: qNormEarlyStop = False # ===================================================================== # basis adaptive polynomial chaos: repeat the calculation by increasing # polynomial degree until the highest accuracy is reached # ===================================================================== # For each degree check all q-norms and choose the best one scores = -np.inf * np.ones(DegreeArray.shape[0]) qNormScores = -np.inf * np.ones(nqnorms) for degIdx, deg in enumerate(DegreeArray): for qidx, q in enumerate(qnorm): # Extract the polynomial basis indices from the pool of # allBasisIndices BasisIndices = self.allBasisIndices[str(deg)][str(q)] # Assemble the Psi matrix Psi = self.create_psi(BasisIndices, self.univ_p_val) # Calulate the cofficients of the meta model outs = self.fit(Psi, ED_Y, BasisIndices) # Calculate and save the score of LOOCV score, LCerror = self.corr_loocv_error(outs['clf_poly'], outs['sparePsi'], outs['coeffs'], ED_Y) # Check the convergence of noise for FastARD if self.pce_reg_method == 'FastARD' and \ outs['clf_poly'].alpha_ < np.finfo(np.float32).eps: score = -np.inf qNormScores[qidx] = score qAllCoeffs[str(qidx+1)] = outs['coeffs'] qAllIndices_Sparse[str(qidx+1)] = outs['spareMulti-Index'] qAllclf_poly[str(qidx+1)] = outs['clf_poly'] qAllnTerms[str(qidx+1)] = BasisIndices.shape[0] qAllLCerror[str(qidx+1)] = LCerror # EarlyStop check # if there are at least n_checks_qNorm entries after the # best one, we stop if qNormEarlyStop and \ sum(np.isfinite(qNormScores)) > n_checks_qNorm: # If the error has increased the last two iterations, stop! qNormScores_nonInf = qNormScores[np.isfinite(qNormScores)] deltas = np.sign(np.diff(qNormScores_nonInf)) if sum(deltas[-n_checks_qNorm+1:]) == 2: # stop the q-norm loop here break if np.var(ED_Y) == 0: break # Store the score in the scores list best_q = np.nanargmax(qNormScores) scores[degIdx] = qNormScores[best_q] AllCoeffs[str(degIdx+1)] = qAllCoeffs[str(best_q+1)] AllIndices_Sparse[str(degIdx+1)] = qAllIndices_Sparse[str(best_q+1)] Allclf_poly[str(degIdx+1)] = qAllclf_poly[str(best_q+1)] AllnTerms[str(degIdx+1)] = qAllnTerms[str(best_q+1)] AllLCerror[str(degIdx+1)] = qAllLCerror[str(best_q+1)] # Check the direction of the error (on average): # if it increases consistently stop the iterations if len(scores[scores != -np.inf]) > n_checks_degree: scores_nonInf = scores[scores != -np.inf] ss = np.sign(scores_nonInf - np.max(scores_nonInf)) # ss<0 error decreasing errorIncreases = np.sum(np.sum(ss[-2:])) <= -1*n_checks_degree if errorIncreases: break # Check only one degree, if target matrix has zero variance if np.var(ED_Y) == 0: break # ------------------ Summary of results ------------------ # Select the one with the best score and save the necessary outputs best_deg = np.nanargmax(scores)+1 coeffs = AllCoeffs[str(best_deg)] basis_indices = AllIndices_Sparse[str(best_deg)] clf_poly = Allclf_poly[str(best_deg)] LOOCVScore = np.nanmax(scores) P = AllnTerms[str(best_deg)] LCerror = AllLCerror[str(best_deg)] degree = DegreeArray[np.nanargmax(scores)] qnorm = float(qnorm[best_q]) # ------------------ Print out Summary of results ------------------ if self.verbose: # Create PSI_Sparse by removing redundent terms nnz_idx = np.nonzero(coeffs)[0] BasisIndices_Sparse = basis_indices[nnz_idx] print(f'Output variable {varIdx+1}:') print('The estimation of PCE coefficients converged at polynomial ' f'degree {DegreeArray[best_deg-1]} with ' f'{len(BasisIndices_Sparse)} terms (Sparsity index = ' f'{round(len(BasisIndices_Sparse)/P, 3)}).') print(f'Final ModLOO error estimate: {1-max(scores):.3e}') print('\n'+'-'*50) if verbose: print('='*50) print(' '*10 + ' Summary of results ') print('='*50) print("scores:\n", scores) print("Best score's degree:", self.DegreeArray[best_deg-1]) print("NO. of terms:", len(basis_indices)) print("Sparsity index:", round(len(basis_indices)/P, 3)) print("Best Indices:\n", basis_indices) if self.pce_reg_method in ['BRR', 'ARD']: fig, ax = plt.subplots(figsize=(12, 10)) plt.title("Marginal log-likelihood") plt.plot(clf_poly.scores_, color='navy', linewidth=2) plt.ylabel("Score") plt.xlabel("Iterations") if self.pce_reg_method.lower() == 'bbr': text = f"$\\alpha={clf_poly.alpha_:.1f}$\n" f"$\\lambda={clf_poly.lambda_:.3f}$\n" f"$L={clf_poly.scores_[-1]:.1f}$" else: text = f"$\\alpha={clf_poly.alpha_:.1f}$\n$" f"\\L={clf_poly.scores_[-1]:.1f}$" plt.text(0.75, 0.5, text, fontsize=18, transform=ax.transAxes) plt.show() print('='*80) # Create a dict to pass the outputs returnVars = dict() returnVars['clf_poly'] = clf_poly returnVars['degree'] = degree returnVars['qnorm'] = qnorm returnVars['coeffs'] = coeffs returnVars['multi_indices'] = basis_indices returnVars['LOOCVScore'] = LOOCVScore returnVars['LCerror'] = LCerror return returnVars # ------------------------------------------------------------------------- def corr_loocv_error(self, clf, psi, coeffs, y): """ Calculates the corrected LOO error for regression on regressor matrix `psi` that generated the coefficients based on [1] and [2]. [1] Blatman, G., 2009. Adaptive sparse polynomial chaos expansions for uncertainty propagation and sensitivity analysis (Doctoral dissertation, Clermont-Ferrand 2). [2] Blatman, G. and Sudret, B., 2011. Adaptive sparse polynomial chaos expansion based on least angle regression. Journal of computational Physics, 230(6), pp.2345-2367. Parameters ---------- clf : object Fitted estimator. psi : array of shape (n_samples, n_features) The multivariate orthogonal polynomials (regressor). coeffs : array-like of shape (n_features,) Estimated cofficients. y : array of shape (n_samples,) Target values. Returns ------- Q_2 : float LOOCV Validation score (1-LOOCV erro). residual : array of shape (n_samples,) Residual values (y - predicted targets). """ psi = np.array(psi, dtype=float) # Create PSI_Sparse by removing redundent terms nnz_idx = np.nonzero(coeffs)[0] if len(nnz_idx) == 0: nnz_idx = [0] psi_sparse = psi[:, nnz_idx] # NrCoeffs of aPCEs P = len(nnz_idx) # NrEvaluation (Size of experimental design) N = psi.shape[0] # Build the projection matrix PsiTPsi = np.dot(psi_sparse.T, psi_sparse) if np.linalg.cond(PsiTPsi) > 1e-12 and \ np.linalg.cond(PsiTPsi) < 1/sys.float_info.epsilon: # faster M = sp.linalg.solve(PsiTPsi, sp.sparse.eye(PsiTPsi.shape[0]).toarray()) else: # stabler M = np.linalg.pinv(PsiTPsi) # h factor (the full matrix is not calculated explicitly, # only the trace is, to save memory) PsiM = np.dot(psi_sparse, M) h = np.sum(np.multiply(PsiM, psi_sparse), axis=1, dtype=np.float128) # ------ Calculate Error Loocv for each measurement point ---- # Residuals if isinstance(clf, list): residual = np.dot(psi, coeffs) - y else: residual = clf.predict(psi) - y # Variance varY = np.var(y) if varY == 0: normEmpErr = 0 ErrLoo = 0 LCerror = np.zeros((y.shape)) else: normEmpErr = np.mean(residual**2)/varY # LCerror = np.divide(residual, (1-h)) LCerror = residual / (1-h)[:, np.newaxis] ErrLoo = np.mean(np.square(LCerror)) / varY # if there are NaNs, just return an infinite LOO error (this # happens, e.g., when a strongly underdetermined problem is solved) if np.isnan(ErrLoo): ErrLoo = np.inf # Corrected Error for over-determined system trM = np.trace(M) if trM < 0 or abs(trM) > 1e6: trM = np.trace(np.linalg.pinv(np.dot(psi.T, psi))) # Over-determined system of Equation if N > P: T_factor = N/(N-P) * (1 + trM) # Under-determined system of Equation else: T_factor = np.inf CorrectedErrLoo = ErrLoo * T_factor Q_2 = 1 - CorrectedErrLoo return Q_2, residual # ------------------------------------------------------------------------- def pca_transformation(self, Output): """ Transforms the targets (outputs) via Principal Component Analysis Parameters ---------- Output : array of shape (n_samples,) Target values. Returns ------- pca : obj Fitted sklearnPCA object. OutputMatrix : array of shape (n_samples,) Transformed target values. """ # Transform via Principal Component Analysis if hasattr(self, 'var_pca_threshold'): var_pca_threshold = self.var_pca_threshold else: var_pca_threshold = 100.0 n_samples, n_features = Output.shape if hasattr(self, 'n_pca_components'): n_pca_components = self.n_pca_components else: # Instantiate and fit sklearnPCA object covar_matrix = sklearnPCA(n_components=None) covar_matrix.fit(Output) var = np.cumsum(np.round(covar_matrix.explained_variance_ratio_, decimals=5)*100) # Find the number of components to explain self.varPCAThreshold of # variance try: n_components = np.where(var >= var_pca_threshold)[0][0] + 1 except IndexError: n_components = min(n_samples, n_features) n_pca_components = min(n_samples, n_features, n_components) # Print out a report print() print('-' * 50) print(f"PCA transformation is performed with {n_pca_components}" " components.") print('-' * 50) print() # Fit and transform with the selected number of components pca = sklearnPCA(n_components=n_pca_components, svd_solver='randomized') OutputMatrix = pca.fit_transform(Output) return pca, OutputMatrix # ------------------------------------------------------------------------- def gaussian_process_emulator(self, X, y, nug_term=None, autoSelect=False, varIdx=None): """ Fits a Gaussian Process Emulator to the target given the training points. Parameters ---------- X : array of shape (n_samples, n_params) Training points. y : array of shape (n_samples,) Target values. nug_term : float, optional Nugget term. The default is None, i.e. variance of y. autoSelect : bool, optional Loop over some kernels and select the best. The default is False. varIdx : int, optional The index number. The default is None. Returns ------- gp : object Fitted estimator. """ nug_term = nug_term if nug_term else np.var(y) Kernels = [nug_term * kernels.RBF(length_scale=1.0, length_scale_bounds=(1e-25, 1e15)), nug_term * kernels.RationalQuadratic(length_scale=0.2, alpha=1.0), nug_term * kernels.Matern(length_scale=1.0, length_scale_bounds=(1e-15, 1e5), nu=1.5)] # Automatic selection of the kernel if autoSelect: gp = {} BME = [] for i, kernel in enumerate(Kernels): gp[i] = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=3, normalize_y=False) # Fit to data using Maximum Likelihood Estimation gp[i].fit(X, y) # Store the MLE as BME score BME.append(gp[i].log_marginal_likelihood()) gp = gp[np.argmax(BME)] else: gp = GaussianProcessRegressor(kernel=Kernels[0], n_restarts_optimizer=3, normalize_y=False) gp.fit(X, y) # Compute score if varIdx is not None: Score = gp.score(X, y) print('-'*50) print(f'Output variable {varIdx}:') print('The estimation of GPE coefficients converged,') print(f'with the R^2 score: {Score:.3f}') print('-'*50) return gp # ------------------------------------------------------------------------- def eval_metamodel(self, samples=None, nsamples=None, sampling_method='random', return_samples=False): """ Evaluates meta-model at the requested samples. One can also generate nsamples. Parameters ---------- samples : array of shape (n_samples, n_params), optional Samples to evaluate meta-model at. The default is None. nsamples : int, optional Number of samples to generate, if no `samples` is provided. The default is None. sampling_method : str, optional Type of sampling, if no `samples` is provided. The default is 'random'. return_samples : bool, optional Retun samples, if no `samples` is provided. The default is False. Returns ------- mean_pred : dict Mean of the predictions. std_pred : dict Standard deviatioon of the predictions. """ if self.meta_model_type.lower() == 'gpe': model_dict = self.gp_poly else: model_dict = self.coeffs_dict if samples is None: if nsamples is None: self.n_samples = 100000 else: self.n_samples = nsamples samples = self.ExpDesign.generate_samples(self.n_samples, sampling_method) else: self.samples = samples self.n_samples = len(samples) # Transform samples samples = self.ExpDesign.transform(samples) if self.meta_model_type.lower() != 'gpe': univ_p_val = self.univ_basis_vals(samples, n_max=np.max(self.pce_deg)) mean_pred = {} std_pred = {} # Loop over outputs for ouput, values in model_dict.items(): mean = np.zeros((len(samples), len(values))) std = np.zeros((len(samples), len(values))) idx = 0 for in_key, InIdxValues in values.items(): # Perdiction with GPE if self.meta_model_type.lower() == 'gpe': X_T = self.x_scaler[ouput].transform(samples) gp = self.gp_poly[ouput][in_key] y_mean, y_std = gp.predict(X_T, return_std=True) else: # Perdiction with PCE or pcekriging # Assemble Psi matrix psi = self.create_psi(self.basis_dict[ouput][in_key], univ_p_val) # Perdiction try: # with error bar clf_poly = self.clf_poly[ouput][in_key] y_mean, y_std = clf_poly.predict(psi, return_std=True) except: # without error bar coeffs = self.coeffs_dict[ouput][in_key] y_mean = np.dot(psi, coeffs) y_std = np.zeros_like(y_mean) mean[:, idx] = y_mean std[:, idx] = y_std idx += 1 if self.dim_red_method.lower() == 'pca': PCA = self.pca[ouput] mean_pred[ouput] = PCA.mean_ + np.dot(mean, PCA.components_) std_pred[ouput] = np.sqrt(np.dot(std**2, PCA.components_**2)) else: mean_pred[ouput] = mean std_pred[ouput] = std if return_samples: return mean_pred, std_pred, samples else: return mean_pred, std_pred # ------------------------------------------------------------------------- def create_model_error(self, X, y, name='Calib'): """ Fits a GPE-based model error. Parameters ---------- X : array of shape (n_outputs, n_inputs) Input array. It can contain any forcing inputs or coordinates of extracted data. y : array of shape (n_outputs,) The model response for the MAP parameter set. name : str, optional Calibration or validation. The default is `'Calib'`. Returns ------- self: object Self object. """ Model = self.ModelObj outputNames = Model.Output.Names self.errorRegMethod = 'GPE' self.errorclf_poly = self.auto_vivification() self.errorScale = self.auto_vivification() # Read data MeasuredData = Model.read_observation(case=name) # Fitting GPR based bias model for out in outputNames: nan_idx = ~np.isnan(MeasuredData[out]) # Select data try: data = MeasuredData[out].values[nan_idx] except AttributeError: data = MeasuredData[out][nan_idx] # Prepare the input matrix scaler = MinMaxScaler() delta = data # - y[out][0] BiasInputs = np.hstack((X[out], y[out].reshape(-1, 1))) X_S = scaler.fit_transform(BiasInputs) gp = self.gaussian_process_emulator(X_S, delta) self.errorScale[out]["y_1"] = scaler self.errorclf_poly[out]["y_1"] = gp return self # ------------------------------------------------------------------------- def eval_model_error(self, X, y_pred): """ Evaluates the error model. Parameters ---------- X : array Inputs. y_pred : dict Predictions. Returns ------- mean_pred : dict Mean predition of the GPE-based error model. std_pred : dict standard deviation of the GPE-based error model. """ mean_pred = {} std_pred = {} for Outkey, ValuesDict in self.errorclf_poly.items(): pred_mean = np.zeros_like(y_pred[Outkey]) pred_std = np.zeros_like(y_pred[Outkey]) for Inkey, InIdxValues in ValuesDict.items(): gp = self.errorclf_poly[Outkey][Inkey] scaler = self.errorScale[Outkey][Inkey] # Transform Samples using scaler for j, pred in enumerate(y_pred[Outkey]): BiasInputs = np.hstack((X[Outkey], pred.reshape(-1, 1))) Samples_S = scaler.transform(BiasInputs) y_hat, y_std = gp.predict(Samples_S, return_std=True) pred_mean[j] = y_hat pred_std[j] = y_std # pred_mean[j] += pred mean_pred[Outkey] = pred_mean std_pred[Outkey] = pred_std return mean_pred, std_pred # ------------------------------------------------------------------------- class auto_vivification(dict): """ Implementation of perl's AutoVivification feature. Source: https://stackoverflow.com/a/651879/18082457 """ def __getitem__(self, item): try: return dict.__getitem__(self, item) except KeyError: value = self[item] = type(self)() return value # ------------------------------------------------------------------------- def __select_degree(self, ndim, nSamples): """ Selects degree based on the number of samples and parameters in the sequential design. Parameters ---------- ndim : TYPE DESCRIPTION. nSamples : TYPE DESCRIPTION. Returns ------- TYPE DESCRIPTION. """ # Define the DegreeArray max_deg = np.max(self.pce_deg) min_Deg = np.min(self.pce_deg) nitr = nSamples - self.ExpDesign.n_init_samples # Check q-norm if not np.isscalar(self.pce_q_norm): self.pce_q_norm = np.array(self.pce_q_norm) else: self.pce_q_norm = np.array([self.pce_q_norm]) def M_uptoMax(maxDeg): n_combo = np.zeros(maxDeg) for i, d in enumerate(range(1, maxDeg+1)): n_combo[i] = math.factorial(ndim+d) n_combo[i] /= math.factorial(ndim) * math.factorial(d) return n_combo if self.ExpDesignFlag != 'sequential': degNew = max_deg else: d = nitr if nitr != 0 and self.n_params > 5 else 1 min_index = np.argmin(abs(M_uptoMax(max_deg)-ndim*nSamples*d)) degNew = range(1, max_deg+1)[min_index] if degNew > min_Deg and self.pce_reg_method.lower() != 'fastard': DegreeArray = np.arange(min_Deg, degNew+1) else: DegreeArray = np.array([degNew]) return DegreeArray
Class variables
var auto_vivification
-
Implementation of perl's AutoVivification feature.
Methods
def create_metamodel(self, Model)
-
Starts the training of the meta-model for the model objects containg the given computational model.
Parameters
Model
:obj
- Model object.
Returns
metamodel
:obj
- The meta model object.
Expand source code
def create_metamodel(self, Model): """ Starts the training of the meta-model for the model objects containg the given computational model. Parameters ---------- Model : obj Model object. Returns ------- metamodel : obj The meta model object. """ self.ModelObj = Model self.n_params = len(self.input_obj.Marginals) self.ExpDesignFlag = 'normal' # --- Prepare pce degree --- if self.meta_model_type.lower() == 'pce': if type(self.pce_deg) is not np.ndarray: self.pce_deg = np.array(self.pce_deg) if self.ExpDesign.method == 'sequential': from .sequential_design import SeqDesign seq_design = SeqDesign(self) metamodel = seq_design.train_seq_design(Model) elif self.ExpDesign.method == 'normal': self.ExpDesignFlag = 'normal' metamodel = self.train_norm_design(Model) else: raise Exception("The method for experimental design you requested" " has not been implemented yet.") # Zip the model run directories if self.ModelObj.link_type.lower() == 'pylink': Model.zip_subdirs(Model.name, f'{Model.name}_') return metamodel
def train_norm_design(self, Model, verbose=False)
-
This function loops over the outputs and each time step/point and fits the meta model.
Parameters
Model
:obj
- Model object.
verbose
:bool
, optional- Flag for a sequential design in silent mode. The default is False.
Returns
self
:obj
- Meta-model object.
Expand source code
def train_norm_design(self, Model, verbose=False): """ This function loops over the outputs and each time step/point and fits the meta model. Parameters ---------- Model : obj Model object. verbose : bool, optional Flag for a sequential design in silent mode. The default is False. Returns ------- self: obj Meta-model object. """ # Get the collocation points to run the forward model CollocationPoints, OutputDict = self.generate_ExpDesign(Model) # Initialize the nested dictionaries self.deg_dict = self.auto_vivification() self.q_norm_dict = self.auto_vivification() self.coeffs_dict = self.auto_vivification() self.basis_dict = self.auto_vivification() self.score_dict = self.auto_vivification() self.clf_poly = self.auto_vivification() self.gp_poly = self.auto_vivification() self.pca = self.auto_vivification() self.LCerror = self.auto_vivification() self.x_scaler = {} # Define the DegreeArray nSamples, ndim = CollocationPoints.shape self.DegreeArray = self.__select_degree(ndim, nSamples) # Generate all basis indices self.allBasisIndices = self.auto_vivification() for deg in self.DegreeArray: keys = self.allBasisIndices.keys() if deg not in np.fromiter(keys, dtype=float): # Generate the polynomial basis indices for qidx, q in enumerate(self.pce_q_norm): basis_indices = self.create_basis_indices(degree=deg, q_norm=q) self.allBasisIndices[str(deg)][str(q)] = basis_indices # Evaluate the univariate polynomials on ExpDesign if self.meta_model_type.lower() != 'gpe': self.univ_p_val = self.univ_basis_vals(CollocationPoints) if 'x_values' in OutputDict: self.ExpDesign.x_values = OutputDict['x_values'] del OutputDict['x_values'] # --- Loop through data points and fit the surrogate --- if not verbose: print(f"\n>>>> Training the {self.meta_model_type} metamodel " "started. <<<<<<\n") items = tqdm(OutputDict.items(), desc="Fitting regression") else: items = OutputDict.items() # For loop over the components/outputs for key, Output in items: # Dimensionality reduction with PCA, if specified if self.dim_red_method.lower() == 'pca': self.pca[key], target = self.pca_transformation(Output) else: target = Output # Parallel fit regression if self.meta_model_type.lower() == 'gpe': # Prepare the input matrix scaler = MinMaxScaler() X_S = scaler.fit_transform(CollocationPoints) self.x_scaler[key] = scaler out = Parallel(n_jobs=-1, prefer='threads')( delayed(self.gaussian_process_emulator)(X_S, target[:, idx]) for idx in range(target.shape[1])) for idx in range(target.shape[1]): self.gp_poly[key][f"y_{idx+1}"] = out[idx] else: out = Parallel(n_jobs=-1, prefer='threads')( delayed(self.adaptive_regression)(CollocationPoints, target[:, idx], idx) for idx in range(target.shape[1])) for i in range(target.shape[1]): # Create a dict to pass the variables self.deg_dict[key][f"y_{i+1}"] = out[i]['degree'] self.q_norm_dict[key][f"y_{i+1}"] = out[i]['qnorm'] self.coeffs_dict[key][f"y_{i+1}"] = out[i]['coeffs'] self.basis_dict[key][f"y_{i+1}"] = out[i]['multi_indices'] self.score_dict[key][f"y_{i+1}"] = out[i]['LOOCVScore'] self.clf_poly[key][f"y_{i+1}"] = out[i]['clf_poly'] self.LCerror[key][f"y_{i+1}"] = out[i]['LCerror'] if not verbose: print(f"\n>>>> Training the {self.meta_model_type} metamodel" " sucessfully completed. <<<<<<\n") return self
def create_basis_indices(self, degree, q_norm)
-
Creates set of selected multi-indices of multivariate polynomials for certain parameter numbers, polynomial degree, hyperbolic (or q-norm) truncation scheme.
Parameters
degree
:int
- Polynomial degree.
q_norm
:float
- hyperbolic (or q-norm) truncation.
Returns
basis_indices
:array
ofshape (n_terms, n_params)
- Multi-indices of multivariate polynomials.
Expand source code
def create_basis_indices(self, degree, q_norm): """ Creates set of selected multi-indices of multivariate polynomials for certain parameter numbers, polynomial degree, hyperbolic (or q-norm) truncation scheme. Parameters ---------- degree : int Polynomial degree. q_norm : float hyperbolic (or q-norm) truncation. Returns ------- basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. """ basis_indices = glexindex(start=0, stop=degree+1, dimensions=self.n_params, cross_truncation=q_norm, reverse=False, graded=True) return basis_indices
def add_ExpDesign(self)
-
Instanciates experimental design object.
Returns
None.
Expand source code
def add_ExpDesign(self): """ Instanciates experimental design object. Returns ------- None. """ self.ExpDesign = ExpDesigns(self.input_obj, meta_Model=self.meta_model_type)
def generate_ExpDesign(self, Model)
-
Prepares the experimental design either by reading from the prescribed data or running simulations.
Parameters
Model
:obj
- Model object.
Raises
Exception
- If model sumulations are not provided properly.
Returns
ED_X_tr
:array
ofshape (n_samples, n_params)
- Training samples transformed by an isoprobabilistic transformation.
ED_Y
:dict
- Model simulations (target) for all outputs.
Expand source code
def generate_ExpDesign(self, Model): """ Prepares the experimental design either by reading from the prescribed data or running simulations. Parameters ---------- Model : obj Model object. Raises ------ Exception If model sumulations are not provided properly. Returns ------- ED_X_tr: array of shape (n_samples, n_params) Training samples transformed by an isoprobabilistic transformation. ED_Y: dict Model simulations (target) for all outputs. """ ExpDesign = self.ExpDesign if self.ExpDesignFlag != 'sequential': # Read ExpDesign (training and targets) from the provided hdf5 if ExpDesign.hdf5_file is not None: # Read hdf5 file f = h5py.File(ExpDesign.hdf5_file, 'r+') # Read EDX and pass it to ExpDesign object try: ExpDesign.X = np.array(f["EDX/New_init_"]) except KeyError: ExpDesign.X = np.array(f["EDX/init_"]) # Update number of initial samples ExpDesign.n_init_samples = ExpDesign.X.shape[0] # Read EDX and pass it to ExpDesign object out_names = self.ModelObj.Output.names ExpDesign.Y = {} # Extract x values try: ExpDesign.Y["x_values"] = dict() for varIdx, var in enumerate(out_names): x = np.array(f[f"x_values/{var}"]) ExpDesign.Y["x_values"][var] = x except KeyError: ExpDesign.Y["x_values"] = np.array(f["x_values"]) # Store the output for varIdx, var in enumerate(out_names): try: y = np.array(f[f"EDY/{var}/New_init_"]) except KeyError: y = np.array(f[f"EDY/{var}/init_"]) ExpDesign.Y[var] = y f.close() else: # Check if an old hdf5 file exists: if yes, rename it hdf5file = f'ExpDesign_{self.ModelObj.name}.hdf5' if os.path.exists(hdf5file): os.rename(hdf5file, 'old_'+hdf5file) # ---- Prepare X samples ---- ED_X, ED_X_tr = ExpDesign.generate_ED(ExpDesign.n_init_samples, ExpDesign.sampling_method, transform=True, max_pce_deg=np.max(self.pce_deg)) ExpDesign.X = ED_X ExpDesign.collocationPoints = ED_X_tr self.bound_tuples = ExpDesign.bound_tuples # ---- Run simulations at X ---- if not hasattr(ExpDesign, 'Y') or ExpDesign.Y is None: print('\n Now the forward model needs to be run!\n') ED_Y, up_ED_X = Model.run_model_parallel(ED_X) ExpDesign.X = up_ED_X self.ModelOutputDict = ED_Y ExpDesign.Y = ED_Y else: # Check if a dict has been passed. if type(ExpDesign.Y) is dict: self.ModelOutputDict = ExpDesign.Y else: raise Exception('Please provide either a dictionary or a hdf5' 'file to ExpDesign.hdf5_file argument.') return ED_X_tr, self.ModelOutputDict
def univ_basis_vals(self, samples, n_max=None)
-
Evaluates univariate regressors along input directions.
Parameters
samples
:array
ofshape (n_samples, n_params)
- Samples.
n_max
:int
, optional- Maximum polynomial degree. The default is
None
.
Returns
univ_basis
:array
ofshape (n_samples, n_params, n_max+1)
- All univariate regressors up to n_max.
Expand source code
def univ_basis_vals(self, samples, n_max=None): """ Evaluates univariate regressors along input directions. Parameters ---------- samples : array of shape (n_samples, n_params) Samples. n_max : int, optional Maximum polynomial degree. The default is `None`. Returns ------- univ_basis: array of shape (n_samples, n_params, n_max+1) All univariate regressors up to n_max. """ # Extract information poly_types = self.ExpDesign.poly_types if samples.ndim != 2: samples = samples.reshape(1, len(samples)) n_max = np.max(self.pce_deg) if n_max is None else n_max # Extract poly coeffs if self.ExpDesign.input_data_given or self.ExpDesign.apce: apolycoeffs = self.ExpDesign.polycoeffs else: apolycoeffs = None # Evaluate univariate basis univ_basis = eval_univ_basis(samples, n_max, poly_types, apolycoeffs) return univ_basis
def create_psi(self, basis_indices, univ_p_val)
-
This function assemble the design matrix Psi from the given basis index set INDICES and the univariate polynomial evaluations univ_p_val.
Parameters
basis_indices
:array
ofshape (n_terms, n_params)
- Multi-indices of multivariate polynomials.
univ_p_val
:array
of(n_samples, n_params, n_max+1)
- All univariate regressors up to
n_max
.
Raises
ValueError
- n_terms in arguments do not match.
Returns
psi
:array
ofshape (n_samples, n_terms)
- Multivariate regressors.
Expand source code
def create_psi(self, basis_indices, univ_p_val): """ This function assemble the design matrix Psi from the given basis index set INDICES and the univariate polynomial evaluations univ_p_val. Parameters ---------- basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. univ_p_val : array of (n_samples, n_params, n_max+1) All univariate regressors up to `n_max`. Raises ------ ValueError n_terms in arguments do not match. Returns ------- psi : array of shape (n_samples, n_terms) Multivariate regressors. """ # Check if BasisIndices is a sparse matrix sparsity = sp.sparse.issparse(basis_indices) if sparsity: basis_indices = basis_indices.toarray() # Initialization and consistency checks # number of input variables n_params = univ_p_val.shape[1] # Size of the experimental design n_samples = univ_p_val.shape[0] # number of basis terms n_terms = basis_indices.shape[0] # check that the variables have consistent sizes if n_params != basis_indices.shape[1]: raise ValueError("The shapes of basis_indices and univ_p_val don't" " match!!") # Preallocate the Psi matrix for performance psi = np.ones((n_samples, n_terms)) # Assemble the Psi matrix for m in range(basis_indices.shape[1]): aa = np.where(basis_indices[:, m] > 0)[0] try: basisIdx = basis_indices[aa, m] bb = np.reshape(univ_p_val[:, m, basisIdx], psi[:, aa].shape) psi[:, aa] = np.multiply(psi[:, aa], bb) except ValueError as err: raise err return psi
def fit(self, X, y, basis_indices, reg_method=None)
-
Fit regression using the regression method provided.
Parameters
X
:array
ofshape (n_samples, n_features)
- Training vector, where n_samples is the number of samples and n_features is the number of features.
y
:array
ofshape (n_samples,)
- Target values.
basis_indices
:array
ofshape (n_terms, n_params)
- Multi-indices of multivariate polynomials.
reg_method
:str
, optional- DESCRIPTION. The default is None.
Returns
return_out_dict
:Dict
- Fitted estimator, spareMulti-Index, sparseX and coefficients.
Expand source code
def fit(self, X, y, basis_indices, reg_method=None): """ Fit regression using the regression method provided. Parameters ---------- X : array of shape (n_samples, n_features) Training vector, where n_samples is the number of samples and n_features is the number of features. y : array of shape (n_samples,) Target values. basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. reg_method : str, optional DESCRIPTION. The default is None. Returns ------- return_out_dict : Dict Fitted estimator, spareMulti-Index, sparseX and coefficients. """ if reg_method is None: reg_method = self.pce_reg_method # Check if BasisIndices is a sparse matrix sparsity = sp.sparse.issparse(basis_indices) clf_poly = [] compute_score = True if self.verbose else False # inverse of the observed variance of the data if np.var(y) != 0: Lambda = 1 / np.var(y) else: Lambda = 1e-6 # Bayes sparse adaptive aPCE if reg_method.lower() != 'ols': if reg_method.lower() == 'brr' or np.var(y) == 0: clf_poly = lm.BayesianRidge(n_iter=1000, tol=1e-7, fit_intercept=True, normalize=True, compute_score=compute_score, alpha_1=1e-04, alpha_2=1e-04, lambda_1=Lambda, lambda_2=Lambda) clf_poly.converged = True elif reg_method.lower() == 'ard': clf_poly = lm.ARDRegression(fit_intercept=True, normalize=True, compute_score=compute_score, n_iter=1000, tol=0.0001, alpha_1=1e-3, alpha_2=1e-3, lambda_1=Lambda, lambda_2=Lambda) elif reg_method.lower() == 'fastard': clf_poly = RegressionFastARD(fit_intercept=True, normalize=True, compute_score=compute_score, n_iter=300, tol=1e-10) elif reg_method.lower() == 'bcs': clf_poly = RegressionFastLaplace(fit_intercept=False, n_iter=1000, tol=1e-7) elif reg_method.lower() == 'lars': clf_poly = lm.LassoLarsCV(fit_intercept=False) elif reg_method.lower() == 'sgdr': clf_poly = lm.SGDRegressor(fit_intercept=False, max_iter=5000, tol=1e-7) elif reg_method.lower() == 'omp': clf_poly = lm.OrthogonalMatchingPursuitCV(fit_intercept=False, max_iter=10) elif reg_method.lower() == 'vbl': clf_poly = VBLinearRegression(fit_intercept=False) elif reg_method.lower() == 'ebl': clf_poly = EBLinearRegression(optimizer='em') # Fit clf_poly.fit(X, y) # Select the nonzero entries of coefficients # The first column must be kept (For mean calculations) nnz_idx = np.nonzero(clf_poly.coef_)[0] if len(nnz_idx) == 0 or nnz_idx[0] != 0: nnz_idx = np.insert(np.nonzero(clf_poly.coef_)[0], 0, 0) # Remove the zero entries for Bases and PSI if need be if sparsity: sparse_basis_indices = basis_indices.toarray()[nnz_idx] else: sparse_basis_indices = basis_indices[nnz_idx] sparse_X = X[:, nnz_idx] # Store the coefficients of the regression model clf_poly.fit(sparse_X, y) coeffs = clf_poly.coef_ else: # This is for the case where all outputs are zero, thereby # all coefficients are zero if sparsity: sparse_basis_indices = basis_indices.toarray() else: sparse_basis_indices = basis_indices sparse_X = X coeffs = clf_poly.coef_ # Ordinary least square method (OLS) else: if sparsity: sparse_basis_indices = basis_indices.toarray() else: sparse_basis_indices = basis_indices sparse_X = X X_T_X = np.dot(sparse_X.T, sparse_X) if np.linalg.cond(X_T_X) > 1e-12 and \ np.linalg.cond(X_T_X) < 1 / sys.float_info.epsilon: # faster coeffs = sp.linalg.solve(X_T_X, np.dot(sparse_X.T, y)) else: # stabler coeffs = np.dot(np.dot(np.linalg.pinv(X_T_X), sparse_X.T), y) # Create a dict to pass the outputs return_out_dict = dict() return_out_dict['clf_poly'] = clf_poly return_out_dict['spareMulti-Index'] = sparse_basis_indices return_out_dict['sparePsi'] = sparse_X return_out_dict['coeffs'] = coeffs return return_out_dict
def adaptive_regression(self, ED_X, ED_Y, varIdx, verbose=False)
-
Adaptively fits the PCE model by comparing the scores of different degrees and q-norm.
Parameters
ED_X
:array
ofshape (n_samples, n_params)
- Experimental design.
ED_Y
:array
ofshape (n_samples,)
- Target values, i.e. simulation results for the Experimental design.
varIdx
:int
- Index of the output.
verbose
:bool
, optional- Print out summary. The default is False.
Returns
returnVars
:Dict
- Fitted estimator, best degree, best q-norm, LOOCVScore and coefficients.
Expand source code
def adaptive_regression(self, ED_X, ED_Y, varIdx, verbose=False): """ Adaptively fits the PCE model by comparing the scores of different degrees and q-norm. Parameters ---------- ED_X : array of shape (n_samples, n_params) Experimental design. ED_Y : array of shape (n_samples,) Target values, i.e. simulation results for the Experimental design. varIdx : int Index of the output. verbose : bool, optional Print out summary. The default is False. Returns ------- returnVars : Dict Fitted estimator, best degree, best q-norm, LOOCVScore and coefficients. """ NrSamples, n_params = ED_X.shape # Initialization qAllCoeffs, AllCoeffs = {}, {} qAllIndices_Sparse, AllIndices_Sparse = {}, {} qAllclf_poly, Allclf_poly = {}, {} qAllnTerms, AllnTerms = {}, {} qAllLCerror, AllLCerror = {}, {} # Extract degree array and qnorm array DegreeArray = np.array([*self.allBasisIndices], dtype=int) qnorm = [*self.allBasisIndices[str(int(DegreeArray[0]))]] # Some options for EarlyStop errorIncreases = False # Stop degree, if LOO error does not decrease n_checks_degree times n_checks_degree = 3 # Stop qNorm, if criterion isn't fulfilled n_checks_qNorm times n_checks_qNorm = 2 nqnorms = len(qnorm) qNormEarlyStop = True if nqnorms < n_checks_qNorm+1: qNormEarlyStop = False # ===================================================================== # basis adaptive polynomial chaos: repeat the calculation by increasing # polynomial degree until the highest accuracy is reached # ===================================================================== # For each degree check all q-norms and choose the best one scores = -np.inf * np.ones(DegreeArray.shape[0]) qNormScores = -np.inf * np.ones(nqnorms) for degIdx, deg in enumerate(DegreeArray): for qidx, q in enumerate(qnorm): # Extract the polynomial basis indices from the pool of # allBasisIndices BasisIndices = self.allBasisIndices[str(deg)][str(q)] # Assemble the Psi matrix Psi = self.create_psi(BasisIndices, self.univ_p_val) # Calulate the cofficients of the meta model outs = self.fit(Psi, ED_Y, BasisIndices) # Calculate and save the score of LOOCV score, LCerror = self.corr_loocv_error(outs['clf_poly'], outs['sparePsi'], outs['coeffs'], ED_Y) # Check the convergence of noise for FastARD if self.pce_reg_method == 'FastARD' and \ outs['clf_poly'].alpha_ < np.finfo(np.float32).eps: score = -np.inf qNormScores[qidx] = score qAllCoeffs[str(qidx+1)] = outs['coeffs'] qAllIndices_Sparse[str(qidx+1)] = outs['spareMulti-Index'] qAllclf_poly[str(qidx+1)] = outs['clf_poly'] qAllnTerms[str(qidx+1)] = BasisIndices.shape[0] qAllLCerror[str(qidx+1)] = LCerror # EarlyStop check # if there are at least n_checks_qNorm entries after the # best one, we stop if qNormEarlyStop and \ sum(np.isfinite(qNormScores)) > n_checks_qNorm: # If the error has increased the last two iterations, stop! qNormScores_nonInf = qNormScores[np.isfinite(qNormScores)] deltas = np.sign(np.diff(qNormScores_nonInf)) if sum(deltas[-n_checks_qNorm+1:]) == 2: # stop the q-norm loop here break if np.var(ED_Y) == 0: break # Store the score in the scores list best_q = np.nanargmax(qNormScores) scores[degIdx] = qNormScores[best_q] AllCoeffs[str(degIdx+1)] = qAllCoeffs[str(best_q+1)] AllIndices_Sparse[str(degIdx+1)] = qAllIndices_Sparse[str(best_q+1)] Allclf_poly[str(degIdx+1)] = qAllclf_poly[str(best_q+1)] AllnTerms[str(degIdx+1)] = qAllnTerms[str(best_q+1)] AllLCerror[str(degIdx+1)] = qAllLCerror[str(best_q+1)] # Check the direction of the error (on average): # if it increases consistently stop the iterations if len(scores[scores != -np.inf]) > n_checks_degree: scores_nonInf = scores[scores != -np.inf] ss = np.sign(scores_nonInf - np.max(scores_nonInf)) # ss<0 error decreasing errorIncreases = np.sum(np.sum(ss[-2:])) <= -1*n_checks_degree if errorIncreases: break # Check only one degree, if target matrix has zero variance if np.var(ED_Y) == 0: break # ------------------ Summary of results ------------------ # Select the one with the best score and save the necessary outputs best_deg = np.nanargmax(scores)+1 coeffs = AllCoeffs[str(best_deg)] basis_indices = AllIndices_Sparse[str(best_deg)] clf_poly = Allclf_poly[str(best_deg)] LOOCVScore = np.nanmax(scores) P = AllnTerms[str(best_deg)] LCerror = AllLCerror[str(best_deg)] degree = DegreeArray[np.nanargmax(scores)] qnorm = float(qnorm[best_q]) # ------------------ Print out Summary of results ------------------ if self.verbose: # Create PSI_Sparse by removing redundent terms nnz_idx = np.nonzero(coeffs)[0] BasisIndices_Sparse = basis_indices[nnz_idx] print(f'Output variable {varIdx+1}:') print('The estimation of PCE coefficients converged at polynomial ' f'degree {DegreeArray[best_deg-1]} with ' f'{len(BasisIndices_Sparse)} terms (Sparsity index = ' f'{round(len(BasisIndices_Sparse)/P, 3)}).') print(f'Final ModLOO error estimate: {1-max(scores):.3e}') print('\n'+'-'*50) if verbose: print('='*50) print(' '*10 + ' Summary of results ') print('='*50) print("scores:\n", scores) print("Best score's degree:", self.DegreeArray[best_deg-1]) print("NO. of terms:", len(basis_indices)) print("Sparsity index:", round(len(basis_indices)/P, 3)) print("Best Indices:\n", basis_indices) if self.pce_reg_method in ['BRR', 'ARD']: fig, ax = plt.subplots(figsize=(12, 10)) plt.title("Marginal log-likelihood") plt.plot(clf_poly.scores_, color='navy', linewidth=2) plt.ylabel("Score") plt.xlabel("Iterations") if self.pce_reg_method.lower() == 'bbr': text = f"$\\alpha={clf_poly.alpha_:.1f}$\n" f"$\\lambda={clf_poly.lambda_:.3f}$\n" f"$L={clf_poly.scores_[-1]:.1f}$" else: text = f"$\\alpha={clf_poly.alpha_:.1f}$\n$" f"\\L={clf_poly.scores_[-1]:.1f}$" plt.text(0.75, 0.5, text, fontsize=18, transform=ax.transAxes) plt.show() print('='*80) # Create a dict to pass the outputs returnVars = dict() returnVars['clf_poly'] = clf_poly returnVars['degree'] = degree returnVars['qnorm'] = qnorm returnVars['coeffs'] = coeffs returnVars['multi_indices'] = basis_indices returnVars['LOOCVScore'] = LOOCVScore returnVars['LCerror'] = LCerror return returnVars
def corr_loocv_error(self, clf, psi, coeffs, y)
-
Calculates the corrected LOO error for regression on regressor matrix
psi
that generated the coefficients based on [1] and [2].[1] Blatman, G., 2009. Adaptive sparse polynomial chaos expansions for uncertainty propagation and sensitivity analysis (Doctoral dissertation, Clermont-Ferrand 2).
[2] Blatman, G. and Sudret, B., 2011. Adaptive sparse polynomial chaos expansion based on least angle regression. Journal of computational Physics, 230(6), pp.2345-2367.
Parameters
clf
:object
- Fitted estimator.
psi
:array
ofshape (n_samples, n_features)
- The multivariate orthogonal polynomials (regressor).
coeffs
:array-like
ofshape (n_features,)
- Estimated cofficients.
y
:array
ofshape (n_samples,)
- Target values.
Returns
Q_2
:float
- LOOCV Validation score (1-LOOCV erro).
residual
:array
ofshape (n_samples,)
- Residual values (y - predicted targets).
Expand source code
def corr_loocv_error(self, clf, psi, coeffs, y): """ Calculates the corrected LOO error for regression on regressor matrix `psi` that generated the coefficients based on [1] and [2]. [1] Blatman, G., 2009. Adaptive sparse polynomial chaos expansions for uncertainty propagation and sensitivity analysis (Doctoral dissertation, Clermont-Ferrand 2). [2] Blatman, G. and Sudret, B., 2011. Adaptive sparse polynomial chaos expansion based on least angle regression. Journal of computational Physics, 230(6), pp.2345-2367. Parameters ---------- clf : object Fitted estimator. psi : array of shape (n_samples, n_features) The multivariate orthogonal polynomials (regressor). coeffs : array-like of shape (n_features,) Estimated cofficients. y : array of shape (n_samples,) Target values. Returns ------- Q_2 : float LOOCV Validation score (1-LOOCV erro). residual : array of shape (n_samples,) Residual values (y - predicted targets). """ psi = np.array(psi, dtype=float) # Create PSI_Sparse by removing redundent terms nnz_idx = np.nonzero(coeffs)[0] if len(nnz_idx) == 0: nnz_idx = [0] psi_sparse = psi[:, nnz_idx] # NrCoeffs of aPCEs P = len(nnz_idx) # NrEvaluation (Size of experimental design) N = psi.shape[0] # Build the projection matrix PsiTPsi = np.dot(psi_sparse.T, psi_sparse) if np.linalg.cond(PsiTPsi) > 1e-12 and \ np.linalg.cond(PsiTPsi) < 1/sys.float_info.epsilon: # faster M = sp.linalg.solve(PsiTPsi, sp.sparse.eye(PsiTPsi.shape[0]).toarray()) else: # stabler M = np.linalg.pinv(PsiTPsi) # h factor (the full matrix is not calculated explicitly, # only the trace is, to save memory) PsiM = np.dot(psi_sparse, M) h = np.sum(np.multiply(PsiM, psi_sparse), axis=1, dtype=np.float128) # ------ Calculate Error Loocv for each measurement point ---- # Residuals if isinstance(clf, list): residual = np.dot(psi, coeffs) - y else: residual = clf.predict(psi) - y # Variance varY = np.var(y) if varY == 0: normEmpErr = 0 ErrLoo = 0 LCerror = np.zeros((y.shape)) else: normEmpErr = np.mean(residual**2)/varY # LCerror = np.divide(residual, (1-h)) LCerror = residual / (1-h)[:, np.newaxis] ErrLoo = np.mean(np.square(LCerror)) / varY # if there are NaNs, just return an infinite LOO error (this # happens, e.g., when a strongly underdetermined problem is solved) if np.isnan(ErrLoo): ErrLoo = np.inf # Corrected Error for over-determined system trM = np.trace(M) if trM < 0 or abs(trM) > 1e6: trM = np.trace(np.linalg.pinv(np.dot(psi.T, psi))) # Over-determined system of Equation if N > P: T_factor = N/(N-P) * (1 + trM) # Under-determined system of Equation else: T_factor = np.inf CorrectedErrLoo = ErrLoo * T_factor Q_2 = 1 - CorrectedErrLoo return Q_2, residual
def pca_transformation(self, Output)
-
Transforms the targets (outputs) via Principal Component Analysis
Parameters
Output
:array
ofshape (n_samples,)
- Target values.
Returns
pca
:obj
- Fitted sklearnPCA object.
OutputMatrix
:array
ofshape (n_samples,)
- Transformed target values.
Expand source code
def pca_transformation(self, Output): """ Transforms the targets (outputs) via Principal Component Analysis Parameters ---------- Output : array of shape (n_samples,) Target values. Returns ------- pca : obj Fitted sklearnPCA object. OutputMatrix : array of shape (n_samples,) Transformed target values. """ # Transform via Principal Component Analysis if hasattr(self, 'var_pca_threshold'): var_pca_threshold = self.var_pca_threshold else: var_pca_threshold = 100.0 n_samples, n_features = Output.shape if hasattr(self, 'n_pca_components'): n_pca_components = self.n_pca_components else: # Instantiate and fit sklearnPCA object covar_matrix = sklearnPCA(n_components=None) covar_matrix.fit(Output) var = np.cumsum(np.round(covar_matrix.explained_variance_ratio_, decimals=5)*100) # Find the number of components to explain self.varPCAThreshold of # variance try: n_components = np.where(var >= var_pca_threshold)[0][0] + 1 except IndexError: n_components = min(n_samples, n_features) n_pca_components = min(n_samples, n_features, n_components) # Print out a report print() print('-' * 50) print(f"PCA transformation is performed with {n_pca_components}" " components.") print('-' * 50) print() # Fit and transform with the selected number of components pca = sklearnPCA(n_components=n_pca_components, svd_solver='randomized') OutputMatrix = pca.fit_transform(Output) return pca, OutputMatrix
def gaussian_process_emulator(self, X, y, nug_term=None, autoSelect=False, varIdx=None)
-
Fits a Gaussian Process Emulator to the target given the training points.
Parameters
X
:array
ofshape (n_samples, n_params)
- Training points.
y
:array
ofshape (n_samples,)
- Target values.
nug_term
:float
, optional- Nugget term. The default is None, i.e. variance of y.
autoSelect
:bool
, optional- Loop over some kernels and select the best. The default is False.
varIdx
:int
, optional- The index number. The default is None.
Returns
gp
:object
- Fitted estimator.
Expand source code
def gaussian_process_emulator(self, X, y, nug_term=None, autoSelect=False, varIdx=None): """ Fits a Gaussian Process Emulator to the target given the training points. Parameters ---------- X : array of shape (n_samples, n_params) Training points. y : array of shape (n_samples,) Target values. nug_term : float, optional Nugget term. The default is None, i.e. variance of y. autoSelect : bool, optional Loop over some kernels and select the best. The default is False. varIdx : int, optional The index number. The default is None. Returns ------- gp : object Fitted estimator. """ nug_term = nug_term if nug_term else np.var(y) Kernels = [nug_term * kernels.RBF(length_scale=1.0, length_scale_bounds=(1e-25, 1e15)), nug_term * kernels.RationalQuadratic(length_scale=0.2, alpha=1.0), nug_term * kernels.Matern(length_scale=1.0, length_scale_bounds=(1e-15, 1e5), nu=1.5)] # Automatic selection of the kernel if autoSelect: gp = {} BME = [] for i, kernel in enumerate(Kernels): gp[i] = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=3, normalize_y=False) # Fit to data using Maximum Likelihood Estimation gp[i].fit(X, y) # Store the MLE as BME score BME.append(gp[i].log_marginal_likelihood()) gp = gp[np.argmax(BME)] else: gp = GaussianProcessRegressor(kernel=Kernels[0], n_restarts_optimizer=3, normalize_y=False) gp.fit(X, y) # Compute score if varIdx is not None: Score = gp.score(X, y) print('-'*50) print(f'Output variable {varIdx}:') print('The estimation of GPE coefficients converged,') print(f'with the R^2 score: {Score:.3f}') print('-'*50) return gp
def eval_metamodel(self, samples=None, nsamples=None, sampling_method='random', return_samples=False)
-
Evaluates meta-model at the requested samples. One can also generate nsamples.
Parameters
samples
:array
ofshape (n_samples, n_params)
, optional- Samples to evaluate meta-model at. The default is None.
nsamples
:int
, optional- Number of samples to generate, if no
samples
is provided. The default is None. sampling_method
:str
, optional- Type of sampling, if no
samples
is provided. The default is 'random'. return_samples
:bool
, optional- Retun samples, if no
samples
is provided. The default is False.
Returns
mean_pred
:dict
- Mean of the predictions.
std_pred
:dict
- Standard deviatioon of the predictions.
Expand source code
def eval_metamodel(self, samples=None, nsamples=None, sampling_method='random', return_samples=False): """ Evaluates meta-model at the requested samples. One can also generate nsamples. Parameters ---------- samples : array of shape (n_samples, n_params), optional Samples to evaluate meta-model at. The default is None. nsamples : int, optional Number of samples to generate, if no `samples` is provided. The default is None. sampling_method : str, optional Type of sampling, if no `samples` is provided. The default is 'random'. return_samples : bool, optional Retun samples, if no `samples` is provided. The default is False. Returns ------- mean_pred : dict Mean of the predictions. std_pred : dict Standard deviatioon of the predictions. """ if self.meta_model_type.lower() == 'gpe': model_dict = self.gp_poly else: model_dict = self.coeffs_dict if samples is None: if nsamples is None: self.n_samples = 100000 else: self.n_samples = nsamples samples = self.ExpDesign.generate_samples(self.n_samples, sampling_method) else: self.samples = samples self.n_samples = len(samples) # Transform samples samples = self.ExpDesign.transform(samples) if self.meta_model_type.lower() != 'gpe': univ_p_val = self.univ_basis_vals(samples, n_max=np.max(self.pce_deg)) mean_pred = {} std_pred = {} # Loop over outputs for ouput, values in model_dict.items(): mean = np.zeros((len(samples), len(values))) std = np.zeros((len(samples), len(values))) idx = 0 for in_key, InIdxValues in values.items(): # Perdiction with GPE if self.meta_model_type.lower() == 'gpe': X_T = self.x_scaler[ouput].transform(samples) gp = self.gp_poly[ouput][in_key] y_mean, y_std = gp.predict(X_T, return_std=True) else: # Perdiction with PCE or pcekriging # Assemble Psi matrix psi = self.create_psi(self.basis_dict[ouput][in_key], univ_p_val) # Perdiction try: # with error bar clf_poly = self.clf_poly[ouput][in_key] y_mean, y_std = clf_poly.predict(psi, return_std=True) except: # without error bar coeffs = self.coeffs_dict[ouput][in_key] y_mean = np.dot(psi, coeffs) y_std = np.zeros_like(y_mean) mean[:, idx] = y_mean std[:, idx] = y_std idx += 1 if self.dim_red_method.lower() == 'pca': PCA = self.pca[ouput] mean_pred[ouput] = PCA.mean_ + np.dot(mean, PCA.components_) std_pred[ouput] = np.sqrt(np.dot(std**2, PCA.components_**2)) else: mean_pred[ouput] = mean std_pred[ouput] = std if return_samples: return mean_pred, std_pred, samples else: return mean_pred, std_pred
def create_model_error(self, X, y, name='Calib')
-
Fits a GPE-based model error.
Parameters
X
:array
ofshape (n_outputs, n_inputs)
- Input array. It can contain any forcing inputs or coordinates of extracted data.
y
:array
ofshape (n_outputs,)
- The model response for the MAP parameter set.
name
:str
, optional- Calibration or validation. The default is
'Calib'
.
Returns
self
:object
- Self object.
Expand source code
def create_model_error(self, X, y, name='Calib'): """ Fits a GPE-based model error. Parameters ---------- X : array of shape (n_outputs, n_inputs) Input array. It can contain any forcing inputs or coordinates of extracted data. y : array of shape (n_outputs,) The model response for the MAP parameter set. name : str, optional Calibration or validation. The default is `'Calib'`. Returns ------- self: object Self object. """ Model = self.ModelObj outputNames = Model.Output.Names self.errorRegMethod = 'GPE' self.errorclf_poly = self.auto_vivification() self.errorScale = self.auto_vivification() # Read data MeasuredData = Model.read_observation(case=name) # Fitting GPR based bias model for out in outputNames: nan_idx = ~np.isnan(MeasuredData[out]) # Select data try: data = MeasuredData[out].values[nan_idx] except AttributeError: data = MeasuredData[out][nan_idx] # Prepare the input matrix scaler = MinMaxScaler() delta = data # - y[out][0] BiasInputs = np.hstack((X[out], y[out].reshape(-1, 1))) X_S = scaler.fit_transform(BiasInputs) gp = self.gaussian_process_emulator(X_S, delta) self.errorScale[out]["y_1"] = scaler self.errorclf_poly[out]["y_1"] = gp return self
def eval_model_error(self, X, y_pred)
-
Evaluates the error model.
Parameters
X
:array
- Inputs.
y_pred
:dict
- Predictions.
Returns
mean_pred
:dict
- Mean predition of the GPE-based error model.
std_pred
:dict
- standard deviation of the GPE-based error model.
Expand source code
def eval_model_error(self, X, y_pred): """ Evaluates the error model. Parameters ---------- X : array Inputs. y_pred : dict Predictions. Returns ------- mean_pred : dict Mean predition of the GPE-based error model. std_pred : dict standard deviation of the GPE-based error model. """ mean_pred = {} std_pred = {} for Outkey, ValuesDict in self.errorclf_poly.items(): pred_mean = np.zeros_like(y_pred[Outkey]) pred_std = np.zeros_like(y_pred[Outkey]) for Inkey, InIdxValues in ValuesDict.items(): gp = self.errorclf_poly[Outkey][Inkey] scaler = self.errorScale[Outkey][Inkey] # Transform Samples using scaler for j, pred in enumerate(y_pred[Outkey]): BiasInputs = np.hstack((X[Outkey], pred.reshape(-1, 1))) Samples_S = scaler.transform(BiasInputs) y_hat, y_std = gp.predict(Samples_S, return_std=True) pred_mean[j] = y_hat pred_std[j] = y_std # pred_mean[j] += pred mean_pred[Outkey] = pred_mean std_pred[Outkey] = pred_std return mean_pred, std_pred
class PostProcessing (MetaModel, name='calib')
-
This class provides many helper functions to post-process the trained meta-model.
Attributes
MetaModel
:obj
- MetaModel object to do postprocessing on.
name
:str
- Type of the anaylsis. The default is
'calib'
. If a validation is expected to be performed change this to'valid'
.
Expand source code
class PostProcessing: """ This class provides many helper functions to post-process the trained meta-model. Attributes ---------- MetaModel : obj MetaModel object to do postprocessing on. name : str Type of the anaylsis. The default is `'calib'`. If a validation is expected to be performed change this to `'valid'`. """ def __init__(self, MetaModel, name='calib'): self.MetaModel = MetaModel self.name = name # ------------------------------------------------------------------------- def plot_moments(self, xlabel='Time [s]', plot_type=None, save_fig=True): """ Plots the moments in a pdf format in the directory `Outputs_PostProcessing`. Parameters ---------- xlabel : str, optional String to be displayed as x-label. The default is `'Time [s]'`. plot_type : str, optional Options: bar or line. The default is `None`. save_fig : bool, optional Save figure or not. The default is `True`. Returns ------- pce_means: dict Mean of the model outputs. pce_means: dict Standard deviation of the model outputs. """ bar_plot = True if plot_type == 'bar' else False meta_model_type = self.MetaModel.meta_model_type Model = self.MetaModel.ModelObj # Read Monte-Carlo reference self.mc_reference = Model.read_mc_reference() print(self.mc_reference) # Set the x values x_values_orig = self.MetaModel.ExpDesign.x_values # Compute the moments with the PCEModel object self._compute_pce_moments() # Get the variables out_names = Model.Output.names # Open a pdf for the plots if save_fig: newpath = (f'Outputs_PostProcessing_{self.name}/') if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}Mean_Std_PCE.pdf') # Plot the best fit line, set the linewidth (lw), color and # transparency (alpha) of the line for idx, key in enumerate(out_names): fig, ax = plt.subplots(nrows=1, ncols=2) # Extract mean and std mean_data = self.pce_means[key] std_data = self.pce_stds[key] # Extract a list of x values if type(x_values_orig) is dict: x = x_values_orig[key] else: x = x_values_orig # Plot: bar plot or line plot if bar_plot: ax[0].bar(list(map(str, x)), mean_data, color='b', width=0.25) ax[1].bar(list(map(str, x)), std_data, color='b', width=0.25) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: ax[0].plot(x, mean_data, lw=3, color='k', marker='x', label=meta_model_type) ax[1].plot(x, std_data, lw=3, color='k', marker='x', label=meta_model_type) if self.mc_reference is not None: if bar_plot: ax[0].bar(list(map(str, x)), self.mc_reference['mean'], color='r', width=0.25) ax[1].bar(list(map(str, x)), self.mc_reference['std'], color='r', width=0.25) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: ax[0].plot(x, self.mc_reference['mean'], lw=3, marker='x', color='r', label='Ref.') ax[1].plot(x, self.mc_reference['std'], lw=3, marker='x', color='r', label='Ref.') # Label the axes and provide a title ax[0].set_xlabel(xlabel) ax[1].set_xlabel(xlabel) ax[0].set_ylabel(key) ax[1].set_ylabel(key) # Provide a title ax[0].set_title('Mean of ' + key) ax[1].set_title('Std of ' + key) if not bar_plot: ax[0].legend(loc='best') ax[1].legend(loc='best') plt.tight_layout() if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() return self.pce_means, self.pce_stds # ------------------------------------------------------------------------- def valid_metamodel(self, n_samples=1, samples=None, x_axis='Time [s]'): """ Evaluates and plots the meta model and the PCEModel outputs for the given number of samples or the given samples. Parameters ---------- n_samples : int, optional Number of samples to be evaluated. The default is 1. samples : array of shape (n_samples, n_params), optional Samples to be evaluated. The default is None. x_axis : str, optional Label of x axis. The default is `'Time [s]'`. Returns ------- None. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj if samples is None: self.n_samples = n_samples samples = self._get_sample() else: self.n_samples = samples.shape[0] # Extract x_values x_values = MetaModel.ExpDesign.x_values self.model_out_dict = self._eval_model(samples, key_str='valid') self.pce_out_mean, self.pce_out_std = MetaModel.eval_metamodel(samples) try: key = Model.Output.names[1] except IndexError: key = Model.Output.names[0] n_obs = self.model_out_dict[key].shape[1] if n_obs == 1: self._plot_validation() else: self._plot_validation_multi(x_values=x_values, x_axis=x_axis) # ------------------------------------------------------------------------- def check_accuracy(self, n_samples=None, samples=None, outputs=None): """ Checks accuracy of the metamodel by computing the root mean square error and validation error for all outputs. Parameters ---------- n_samples : int, optional Number of samples. The default is None. samples : array of shape (n_samples, n_params), optional Parameter sets to be checked. The default is None. outputs : dict, optional Output dictionary with model outputs for all given output types in `Model.Output.names`. The default is None. Raises ------ Exception When neither n_samples nor samples are provided. Returns ------- rmse: dict Root mean squared error for each output. valid_error : dict Validation error for each output. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj # Set the number of samples if n_samples: self.n_samples = n_samples elif samples is not None: self.n_samples = samples.shape[0] else: raise Exception("Please provide either samples or pass number of " "samples!") # Generate random samples if necessary Samples = self._get_sample() if samples is None else samples # Run the original model with the generated samples if outputs is None: outputs = self._eval_model(Samples, key_str='validSet') # Run the PCE model with the generated samples pce_outputs, _ = MetaModel.eval_metamodel(samples=Samples) self.rmse = {} self.valid_error = {} # Loop over the keys and compute RMSE error. for key in Model.Output.names: # Root mena square self.rmse[key] = mean_squared_error(outputs[key], pce_outputs[key], squared=False, multioutput='raw_values') # Validation error self.valid_error[key] = (self.rmse[key]**2 / self.n_samples) / \ np.var(outputs[key], ddof=1, axis=0) # Print a report table print("\n>>>>> Errors of {} <<<<<".format(key)) print("\nIndex | RMSE | Validation Error") print('-'*35) print('\n'.join(f'{i+1} | {k:.3e} | {j:.3e}' for i, (k, j) in enumerate(zip(self.rmse[key], self.valid_error[key])))) # Save error dicts in PCEModel object self.MetaModel.rmse = self.rmse self.MetaModel.valid_error = self.valid_error return self.rmse, self.valid_error # ------------------------------------------------------------------------- def plot_seq_design_diagnostics(self, ref_BME_KLD=None, save_fig=True): """ Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence (KLD) for the sequential design. Parameters ---------- ref_BME_KLD : array, optional Reference BME and KLD . The default is `None`. save_fig : bool, optional Whether to save the figures. The default is `True`. Returns ------- None. """ PCEModel = self.MetaModel n_init_samples = PCEModel.ExpDesign.n_init_samples n_total_samples = PCEModel.ExpDesign.X.shape[0] if save_fig: newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}/seqPCEModelDiagnostics.pdf') plotList = ['Modified LOO error', 'Validation error', 'KLD', 'BME', 'RMSEMean', 'RMSEStd', 'Hellinger distance'] seqList = [PCEModel.SeqModifiedLOO, PCEModel.seqValidError, PCEModel.SeqKLD, PCEModel.SeqBME, PCEModel.seqRMSEMean, PCEModel.seqRMSEStd, PCEModel.SeqDistHellinger] markers = ('x', 'o', 'd', '*', '+') colors = ('k', 'darkgreen', 'b', 'navy', 'darkred') # Plot the evolution of the diagnostic criteria of the # Sequential Experimental Design. for plotidx, plot in enumerate(plotList): fig, ax = plt.subplots() seq_dict = seqList[plotidx] name_util = list(seq_dict.keys()) if len(name_util) == 0: continue # Box plot when Replications have been detected. if any(int(name.split("rep_", 1)[1]) > 1 for name in name_util): # Extract the values from dict sorted_seq_opt = {} # Number of replications n_reps = PCEModel.ExpDesign.nReprications # Get the list of utility function names # Handle if only one UtilityFunction is provided if not isinstance(PCEModel.ExpDesign.UtilityFunction, list): util_funcs = [PCEModel.ExpDesign.UtilityFunction] else: util_funcs = PCEModel.ExpDesign.UtilityFunction for util in util_funcs: sortedSeq = {} # min number of runs available from reps n_runs = min([seq_dict[f'{util}_rep_{i+1}'].shape[0] for i in range(n_reps)]) for runIdx in range(n_runs): values = [] for key in seq_dict.keys(): if util in key: values.append(seq_dict[key][runIdx].mean()) sortedSeq['SeqItr_'+str(runIdx)] = np.array(values) sorted_seq_opt[util] = sortedSeq # BoxPlot def draw_plot(data, labels, edge_color, fill_color, idx): pos = labels - (idx-1) bp = plt.boxplot(data, positions=pos, labels=labels, patch_artist=True, sym='', widths=0.75) elements = ['boxes', 'whiskers', 'fliers', 'means', 'medians', 'caps'] for element in elements: plt.setp(bp[element], color=edge_color[idx]) for patch in bp['boxes']: patch.set(facecolor=fill_color[idx]) if PCEModel.ExpDesign.n_new_samples != 1: step1 = PCEModel.ExpDesign.n_new_samples step2 = 1 else: step1 = 5 step2 = 5 edge_color = ['red', 'blue', 'green'] fill_color = ['tan', 'cyan', 'lightgreen'] plot_label = plot # Plot for different Utility Functions for idx, util in enumerate(util_funcs): all_errors = np.empty((n_reps, 0)) for key in list(sorted_seq_opt[util].keys()): errors = sorted_seq_opt.get(util, {}).get(key)[:, None] all_errors = np.hstack((all_errors, errors)) # Special cases for BME and KLD if plot == 'KLD' or plot == 'BME': # BME convergence if refBME is provided if ref_BME_KLD is not None: if plot == 'BME': refValue = ref_BME_KLD[0] plot_label = r'$BME/BME^{Ref.}$' if plot == 'KLD': refValue = ref_BME_KLD[1] plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\ ' / D_{KL}^{Ref.}[p(\theta|y_*), '\ 'p(\theta)]$' # Difference between BME/KLD and the ref. values all_errors = np.divide(all_errors, np.full((all_errors.shape), refValue)) # Plot baseline for zero, i.e. no difference plt.axhline(y=1.0, xmin=0, xmax=1, c='green', ls='--', lw=2) # Plot each UtilFuncs labels = np.arange(n_init_samples, n_total_samples+1, step1) draw_plot(all_errors[:, ::step2], labels, edge_color, fill_color, idx) plt.xticks(labels, labels) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) ax.xaxis.grid(True, which='major', linestyle='-') ax.xaxis.grid(True, which='minor', linestyle='--') # Legend legend_elements = [] for idx, util in enumerate(util_funcs): legend_elements.append(Patch(facecolor=fill_color[idx], edgecolor=edge_color[idx], label=util)) plt.legend(handles=legend_elements[::-1], loc='best') if plot != 'BME' and plot != 'KLD': plt.yscale('log') plt.autoscale(True) plt.xlabel('\\# of training samples') plt.ylabel(plot_label) plt.title(plot) if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # Save arrays into files f = open(f'./{newpath}/Seq{plot}.txt', 'w') f.write(str(sorted_seq_opt)) f.close() else: for idx, name in enumerate(name_util): seq_values = seq_dict[name] if PCEModel.ExpDesign.n_new_samples != 1: step = PCEModel.ExpDesign.n_new_samples else: step = 1 x_idx = np.arange(n_init_samples, n_total_samples+1, step) if n_total_samples not in x_idx: x_idx = np.hstack((x_idx, n_total_samples)) if plot == 'KLD' or plot == 'BME': # BME convergence if refBME is provided if ref_BME_KLD is not None: if plot == 'BME': refValue = ref_BME_KLD[0] plot_label = r'$BME/BME^{Ref.}$' if plot == 'KLD': refValue = ref_BME_KLD[1] plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\ ' / D_{KL}^{Ref.}[p(\theta|y_*), '\ 'p(\theta)]$' # Difference between BME/KLD and the ref. values values = np.divide(seq_values, np.full((seq_values.shape), refValue)) # Plot baseline for zero, i.e. no difference plt.axhline(y=1.0, xmin=0, xmax=1, c='green', ls='--', lw=2) # Set the limits plt.ylim([1e-1, 1e1]) # Create the plots plt.semilogy(x_idx, values, marker=markers[idx], color=colors[idx], ls='--', lw=2, label=name.split("_rep", 1)[0]) else: plot_label = plot # Create the plots plt.plot(x_idx, seq_values, marker=markers[idx], color=colors[idx], ls='--', lw=2, label=name.split("_rep", 1)[0]) else: plot_label = plot seq_values = np.nan_to_num(seq_values) # Plot the error evolution for each output for i in range(seq_values.shape[1]): plt.semilogy(x_idx, seq_values[:, i], ls='--', lw=2, marker=markers[idx], color=colors[idx], alpha=0.15) plt.semilogy(x_idx, seq_values, marker=markers[idx], ls='--', lw=2, color=colors[idx], label=name.split("_rep", 1)[0]) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) ax.xaxis.grid(True, which='major', linestyle='-') ax.xaxis.grid(True, which='minor', linestyle='--') ax.tick_params(axis='both', which='major', direction='in', width=3, length=10) ax.tick_params(axis='both', which='minor', direction='in', width=2, length=8) plt.xlabel('Number of runs') plt.ylabel(plot_label) plt.title(plot) plt.legend(frameon=True) if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # ---------------- Saving arrays into files --------------- np.save(f'./{newpath}/Seq{plot}.npy', seq_values) # Close the pdf pdf.close() return # ------------------------------------------------------------------------- def sobol_indices(self, xlabel='Time [s]', plot_type=None, save_fig=True): """ Provides Sobol indices as a sensitivity measure to infer the importance of the input parameters. See Eq. 27 in [1] for more details. For the case with Principal component analysis refer to [2]. [1] Global sensitivity analysis: A flexible and efficient framework with an example from stochastic hydrogeology S. Oladyshkin, F.P. de Barros, W. Nowak https://doi.org/10.1016/j.advwatres.2011.11.001 [2] Nagel, J.B., Rieckermann, J. and Sudret, B., 2020. Principal component analysis and sparse polynomial chaos expansions for global sensitivity analysis and model calibration: Application to urban drainage simulation. Reliability Engineering & System Safety, 195, p.106737. Parameters ---------- xlabel : str, optional Label of the x-axis. The default is `'Time [s]'`. plot_type : str, optional Plot type. The default is `None`. This corresponds to line plot. Bar chart can be selected by `bar`. save_fig : bool, optional Whether to save the figures. The default is `True`. Returns ------- sobol_cell: dict Sobol indices. total_sobol: dict Total Sobol indices. """ # Extract the necessary variables PCEModel = self.MetaModel basis_dict = PCEModel.basis_dict coeffs_dict = PCEModel.coeffs_dict n_params = PCEModel.n_params max_order = np.max(PCEModel.pce_deg) self.sobol_cell = {} self.total_sobol = {} for Output in PCEModel.ModelObj.Output.names: n_meas_points = len(coeffs_dict[Output]) # Initialize the (cell) array containing the (total) Sobol indices. sobol_array = dict.fromkeys(range(1, max_order+1), []) sobol_cell_array = dict.fromkeys(range(1, max_order+1), []) for i_order in range(1, max_order+1): n_comb = math.comb(n_params, i_order) sobol_cell_array[i_order] = np.zeros((n_comb, n_meas_points)) total_sobol_array = np.zeros((n_params, n_meas_points)) # Initialize the cell to store the names of the variables TotalVariance = np.zeros((n_meas_points)) # Loop over all measurement points and calculate sobol indices for pIdx in range(n_meas_points): # Extract the basis indices (alpha) and coefficients Basis = basis_dict[Output][f'y_{pIdx+1}'] try: clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+1}'] PCECoeffs = clf_poly.coef_ except: PCECoeffs = coeffs_dict[Output][f'y_{pIdx+1}'] # Compute total variance TotalVariance[pIdx] = np.sum(np.square(PCECoeffs[1:])) nzidx = np.where(PCECoeffs != 0)[0] # Set all the Sobol indices equal to zero in the presence of a # null output. if len(nzidx) == 0: # This is buggy. for i_order in range(1, max_order+1): sobol_cell_array[i_order][:, pIdx] = 0 # Otherwise compute them by summing well-chosen coefficients else: nz_basis = Basis[nzidx] for i_order in range(1, max_order+1): idx = np.where(np.sum(nz_basis > 0, axis=1) == i_order) subbasis = nz_basis[idx] Z = np.array(list(combinations(range(n_params), i_order))) for q in range(Z.shape[0]): Zq = Z[q] subsubbasis = subbasis[:, Zq] subidx = np.prod(subsubbasis, axis=1) > 0 sum_ind = nzidx[idx[0][subidx]] if TotalVariance[pIdx] == 0.0: sobol_cell_array[i_order][q, pIdx] = 0.0 else: sobol = np.sum(np.square(PCECoeffs[sum_ind])) sobol /= TotalVariance[pIdx] sobol_cell_array[i_order][q, pIdx] = sobol # Compute the TOTAL Sobol indices. for ParIdx in range(n_params): idx = nz_basis[:, ParIdx] > 0 sum_ind = nzidx[idx] if TotalVariance[pIdx] == 0.0: total_sobol_array[ParIdx, pIdx] = 0.0 else: sobol = np.sum(np.square(PCECoeffs[sum_ind])) sobol /= TotalVariance[pIdx] total_sobol_array[ParIdx, pIdx] = sobol # ----- if PCA selected: Compute covariance ----- if PCEModel.dim_red_method.lower() == 'pca': cov_Z_p_q = np.zeros((n_params)) # Extract the basis indices (alpha) and coefficients for # next component if pIdx < n_meas_points-1: nextBasis = basis_dict[Output][f'y_{pIdx+2}'] try: clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+2}'] nextPCECoeffs = clf_poly.coef_ except: nextPCECoeffs = coeffs_dict[Output][f'y_{pIdx+2}'] # Choose the common non-zero basis mask = (Basis[:, None] == nextBasis).all(-1).any(-1) similar_basis = Basis[mask] # Compute the TOTAL Sobol indices. for ParIdx in range(n_params): idx = similar_basis[:, ParIdx] > 0 try: sum_is = nzidx[idx] cov_Z_p_q[ParIdx] = np.sum(PCECoeffs[sum_ind] * nextPCECoeffs[sum_is]) except: cov_Z_p_q[ParIdx] = 0.0 # Compute the sobol indices according to Ref. 2 if PCEModel.dim_red_method.lower() == 'pca': n_c_points = PCEModel.ExpDesign.Y[Output].shape[1] PCA = PCEModel.pca[Output] compPCA = PCA.components_ nComp = compPCA.shape[0] var_Z_p = PCA.explained_variance_ # Extract the sobol index of the components for i_order in range(1, max_order+1): n_comb = math.comb(n_params, i_order) sobol_array[i_order] = np.zeros((n_comb, n_c_points)) Z = np.array(list(combinations(range(n_params), i_order))) for q in range(Z.shape[0]): S_Z_i = sobol_cell_array[i_order][q] for tIdx in range(n_c_points): var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx]) if var_Y_t == 0.0: term1, term2 = 0.0, 0.0 else: term1 = np.sum([S_Z_i[i]*(var_Z_p[i]*(compPCA[i, tIdx]**2)/var_Y_t) for i in range(nComp)]) # Term 2 # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs Phi_t_p = compPCA[:nComp-1] Phi_t_q = compPCA term2 = 2 * np.sum([cov_Z_p_q[ParIdx] * Phi_t_p[i,tIdx] * Phi_t_q[i,tIdx]/var_Y_t for i in range(nComp-1)]) sobol_array[i_order][q, tIdx] = term1 #+ term2 # Compute the TOTAL Sobol indices. total_sobol = np.zeros((n_params, n_c_points)) for ParIdx in range(n_params): S_Z_i = total_sobol_array[ParIdx] for tIdx in range(n_c_points): var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx]) if var_Y_t == 0.0: term1, term2 = 0.0, 0.0 else: term1 = 0 for i in range(nComp): term1 += S_Z_i[i] * var_Z_p[i] * \ (compPCA[i, tIdx]**2) / var_Y_t # Term 2 # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs Phi_t_p = compPCA[:nComp-1] Phi_t_q = compPCA term2 = 0 for i in range(nComp-1): term2 += cov_Z_p_q[ParIdx] * Phi_t_p[i, tIdx] \ * Phi_t_q[i, tIdx] / var_Y_t term2 *= 2 total_sobol[ParIdx, tIdx] = term1 + term2 self.sobol_cell[Output] = sobol_array self.total_sobol[Output] = total_sobol else: self.sobol_cell[Output] = sobol_cell_array self.total_sobol[Output] = total_sobol_array # ---------------- Plot ----------------------- par_names = PCEModel.ExpDesign.par_names x_values_orig = PCEModel.ExpDesign.x_values cases = [''] for case in cases: newpath = (f'Outputs_PostProcessing_{self.name}/') if not os.path.exists(newpath): os.makedirs(newpath) if save_fig: # create a PdfPages object name = case+'_' if 'Valid' in cases else '' pdf = PdfPages('./'+newpath+name+'Sobol_indices.pdf') fig = plt.figure() for outIdx, Output in enumerate(PCEModel.ModelObj.Output.names): # Extract total Sobol indices total_sobol = self.total_sobol[Output] # Extract a list of x values if type(x_values_orig) is dict: x = x_values_orig[Output] else: x = x_values_orig if plot_type == 'bar': ax = fig.add_axes([0, 0, 1, 1]) dict1 = {xlabel: x} dict2 = {param: sobolIndices for param, sobolIndices in zip(par_names, total_sobol)} df = pd.DataFrame({**dict1, **dict2}) df.plot(x=xlabel, y=par_names, kind="bar", ax=ax, rot=0, colormap='Dark2') ax.set_ylabel('Total Sobol indices, $S^T$') else: for i, sobolIndices in enumerate(total_sobol): plt.plot(x, sobolIndices, label=par_names[i], marker='x', lw=2.5) plt.ylabel('Total Sobol indices, $S^T$') plt.xlabel(xlabel) plt.title(f'Sensitivity analysis of {Output}') if plot_type != 'bar': plt.legend(loc='best', frameon=True) # Save indices np.savetxt(f'./{newpath}{name}totalsobol_' + Output.replace('/', '_') + '.csv', total_sobol.T, delimiter=',', header=','.join(par_names), comments='') if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() return self.sobol_cell, self.total_sobol # ------------------------------------------------------------------------- def check_reg_quality(self, n_samples=1000, samples=None, save_fig=True): """ Checks the quality of the metamodel for single output models based on: https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685 Parameters ---------- n_samples : int, optional Number of parameter sets to use for the check. The default is 1000. samples : array of shape (n_samples, n_params), optional Parameter sets to use for the check. The default is None. save_fig : bool, optional Whether to save the figures. The default is True. Returns ------- None. """ MetaModel = self.MetaModel if samples is None: self.n_samples = n_samples samples = self._get_sample() else: self.n_samples = samples.shape[0] # Evaluate the original and the surrogate model y_val = self._eval_model(samples, key_str='valid') y_pce_val, _ = MetaModel.eval_metamodel(samples=samples) # Open a pdf for the plots if save_fig: newpath = (r'Outputs_PostProcessing_{0}/'.format(self.name)) if not os.path.exists(newpath): os.makedirs(newpath) # Fit the data(train the model) for key in y_pce_val.keys(): y_pce_val_ = y_pce_val[key] y_val_ = y_val[key] # ------ Residuals vs. predicting variables ------ # Check the assumptions of linearity and independence fig1 = plt.figure() plt.title(key+": Residuals vs. Predicting variables") residuals = y_val_ - y_pce_val_ plt.scatter(x=y_val_, y=residuals, color='blue', edgecolor='k') plt.grid(True) xmin, xmax = min(y_val_), max(y_val_) plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3, linestyle='--') plt.xlabel(key) plt.ylabel('Residuals') plt.show() if save_fig: # save the current figure fig1.savefig(f'./{newpath}/Residuals_vs_PredVariables.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Fitted vs. residuals ------ # Check the assumptions of linearity and independence fig2 = plt.figure() plt.title(key+": Residuals vs. predicting variables") plt.scatter(x=y_pce_val_, y=residuals, color='blue', edgecolor='k') plt.grid(True) xmin, xmax = min(y_val_), max(y_val_) plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3, linestyle='--') plt.xlabel(key) plt.ylabel('Residuals') plt.show() if save_fig: # save the current figure fig2.savefig(f'./{newpath}/Fitted_vs_Residuals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Histogram of normalized residuals ------ fig3 = plt.figure() resid_pearson = residuals / (max(residuals)-min(residuals)) plt.hist(resid_pearson, bins=20, edgecolor='k') plt.ylabel('Count') plt.xlabel('Normalized residuals') plt.title(f"{key}: Histogram of normalized residuals") # Normality (Shapiro-Wilk) test of the residuals ax = plt.gca() _, p = stats.shapiro(residuals) if p < 0.01: annText = "The residuals seem to come from Gaussian process." else: annText = "The normality assumption may not hold." at = AnchoredText(annText, prop=dict(size=30), frameon=True, loc='upper left') at.patch.set_boxstyle("round,pad=0.,rounding_size=0.2") ax.add_artist(at) plt.show() if save_fig: # save the current figure fig3.savefig(f'./{newpath}/Hist_NormResiduals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Q-Q plot of the normalized residuals ------ plt.figure() fig4 = qqplot(resid_pearson, line='45', fit='True') plt.xticks() plt.yticks() plt.xlabel("Theoretical quantiles") plt.ylabel("Sample quantiles") plt.title(key+": Q-Q plot of normalized residuals") plt.grid(True) plt.show() if save_fig: # save the current figure fig4.savefig(f'./{newpath}/QQPlot_NormResiduals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------------------------------------------------------------------------- def eval_pce_model_3d(self, save_fig=True): self.n_samples = 1000 PCEModel = self.MetaModel Model = self.MetaModel.ModelObj n_samples = self.n_samples # Create 3D-Grid # TODO: Make it general x = np.linspace(-5, 10, n_samples) y = np.linspace(0, 15, n_samples) X, Y = np.meshgrid(x, y) PCE_Z = np.zeros((self.n_samples, self.n_samples)) Model_Z = np.zeros((self.n_samples, self.n_samples)) for idxMesh in range(self.n_samples): sample_mesh = np.vstack((X[:, idxMesh], Y[:, idxMesh])).T univ_p_val = PCEModel.univ_basis_vals(sample_mesh) for Outkey, ValuesDict in PCEModel.coeffs_dict.items(): pce_out_mean = np.zeros((len(sample_mesh), len(ValuesDict))) pce_out_std = np.zeros((len(sample_mesh), len(ValuesDict))) model_outs = np.zeros((len(sample_mesh), len(ValuesDict))) for Inkey, InIdxValues in ValuesDict.items(): idx = int(Inkey.split('_')[1]) - 1 basis_deg_ind = PCEModel.basis_dict[Outkey][Inkey] clf_poly = PCEModel.clf_poly[Outkey][Inkey] PSI_Val = PCEModel.create_psi(basis_deg_ind, univ_p_val) # Perdiction with error bar y_mean, y_std = clf_poly.predict(PSI_Val, return_std=True) pce_out_mean[:, idx] = y_mean pce_out_std[:, idx] = y_std # Model evaluation model_out_dict, _ = Model.run_model_parallel(sample_mesh, key_str='Valid3D') model_outs[:, idx] = model_out_dict[Outkey].T PCE_Z[:, idxMesh] = y_mean Model_Z[:, idxMesh] = model_outs[:, 0] # ---------------- 3D plot for PCEModel ----------------------- fig_PCE = plt.figure() ax = plt.axes(projection='3d') ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1, cmap='viridis', edgecolor='none') ax.set_title('PCEModel') ax.set_xlabel('$x_1$') ax.set_ylabel('$x_2$') ax.set_zlabel('$f(x_1,x_2)$') plt.grid() plt.show() if save_fig: # Saving the figure newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # save the figure to file fig_PCE.savefig(f'./{newpath}/3DPlot_PCEModel.pdf', format="pdf", bbox_inches='tight') plt.close(fig_PCE) # ---------------- 3D plot for Model ----------------------- fig_Model = plt.figure() ax = plt.axes(projection='3d') ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1, cmap='viridis', edgecolor='none') ax.set_title('Model') ax.set_xlabel('$x_1$') ax.set_ylabel('$x_2$') ax.set_zlabel('$f(x_1,x_2)$') plt.grid() plt.show() if save_fig: # Save the figure fig_Model.savefig(f'./{newpath}/3DPlot_Model.pdf', format="pdf", bbox_inches='tight') plt.close(fig_Model) return # ------------------------------------------------------------------------- def _compute_pce_moments(self): """ Computes the first two moments using the PCE-based meta-model. Returns ------- None. """ MetaModel = self.MetaModel self.pce_means = {} self.pce_stds = {} for Outkey, ValuesDict in MetaModel.coeffs_dict.items(): pce_mean = np.zeros((len(ValuesDict))) pce_var = np.zeros((len(ValuesDict))) for Inkey, InIdxValues in ValuesDict.items(): idx = int(Inkey.split('_')[1]) - 1 coeffs = MetaModel.coeffs_dict[Outkey][Inkey] # Mean = c_0 if coeffs[0] != 0: pce_mean[idx] = coeffs[0] else: pce_mean[idx] = MetaModel.clf_poly[Outkey][Inkey].intercept_ # Var = sum(coeffs[1:]**2) pce_var[idx] = np.sum(np.square(coeffs[1:])) # Back transformation if PCA is selected. if MetaModel.dim_red_method.lower() == 'pca': PCA = MetaModel.pca[Outkey] self.pce_means[Outkey] = PCA.mean_ self.pce_means[Outkey] += np.dot(pce_mean, PCA.components_) self.pce_stds[Outkey] = np.sqrt(np.dot(pce_var, PCA.components_**2)) else: self.pce_means[Outkey] = pce_mean self.pce_stds[Outkey] = np.sqrt(pce_var) # Print a report table print("\n>>>>> Moments of {} <<<<<".format(Outkey)) print("\nIndex | Mean | Std. deviation") print('-'*35) print('\n'.join(f'{i+1} | {k:.3e} | {j:.3e}' for i, (k, j) in enumerate(zip(self.pce_means[Outkey], self.pce_stds[Outkey])))) print('-'*40) # ------------------------------------------------------------------------- def _get_sample(self, n_samples=None): """ Generates random samples taken from the input parameter space. Returns ------- samples : array of shape (n_samples, n_params) Generated samples. """ if n_samples is None: n_samples = self.n_samples PCEModel = self.MetaModel self.samples = PCEModel.ExpDesign.generate_samples(n_samples, 'random') return self.samples # ------------------------------------------------------------------------- def _eval_model(self, samples=None, key_str='Valid'): """ Evaluates Forward Model for the given number of self.samples or given samples. Parameters ---------- samples : array of shape (n_samples, n_params), optional Samples to evaluate the model at. The default is None. key_str : str, optional Key string pass to the model. The default is 'Valid'. Returns ------- model_outs : dict Dictionary of results. """ Model = self.MetaModel.ModelObj if samples is None: samples = self._get_sample() self.samples = samples else: self.n_samples = len(samples) model_outs, _ = Model.run_model_parallel(samples, key_str=key_str) return model_outs # ------------------------------------------------------------------------- def _plot_validation(self, save_fig=True): """ Plots outputs for visual comparison of metamodel outputs with that of the (full) original model. Parameters ---------- save_fig : bool, optional Save the plots. The default is True. Returns ------- None. """ PCEModel = self.MetaModel # get the samples x_val = self.samples y_pce_val = self.pce_out_mean y_val = self.model_out_dict # Open a pdf for the plots if save_fig: newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf1 = PdfPages(f'./{newpath}/Model_vs_PCEModel.pdf') fig = plt.figure() # Fit the data(train the model) for key in y_pce_val.keys(): y_pce_val_ = y_pce_val[key] y_val_ = y_val[key] regression_model = LinearRegression() regression_model.fit(y_pce_val_, y_val_) # Predict x_new = np.linspace(np.min(y_pce_val_), np.max(y_val_), 100) y_predicted = regression_model.predict(x_new[:, np.newaxis]) plt.scatter(y_pce_val_, y_val_, color='gold', linewidth=2) plt.plot(x_new, y_predicted, color='k') # Calculate the adjusted R_squared and RMSE # the total number of explanatory variables in the model # (not including the constant term) length_list = [] for key, value in PCEModel.coeffs_dict[key].items(): length_list.append(len(value)) n_predictors = min(length_list) n_samples = x_val.shape[0] R2 = r2_score(y_pce_val_, y_val_) AdjR2 = 1 - (1 - R2) * (n_samples - 1) / \ (n_samples - n_predictors - 1) rmse = mean_squared_error(y_pce_val_, y_val_, squared=False) plt.annotate(f'RMSE = {rmse:.3f}\n Adjusted $R^2$ = {AdjR2:.3f}', xy=(0.05, 0.85), xycoords='axes fraction') plt.ylabel("Original Model") plt.xlabel("PCE Model") plt.grid() plt.show() if save_fig: # save the current figure pdf1.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # Close the pdfs pdf1.close() # ------------------------------------------------------------------------- def _plot_validation_multi(self, x_values=[], x_axis="x [m]", save_fig=True): """ Plots outputs for visual comparison of metamodel outputs with that of the (full) multioutput original model Parameters ---------- x_values : list or array, optional List of x values. The default is []. x_axis : str, optional Label of the x axis. The default is "x [m]". save_fig : bool, optional Whether to save the figures. The default is True. Returns ------- None. """ Model = self.MetaModel.ModelObj if save_fig: newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}/Model_vs_PCEModel.pdf') # List of markers and colors color = cycle((['b', 'g', 'r', 'y', 'k'])) marker = cycle(('x', 'd', '+', 'o', '*')) fig = plt.figure() # Plot the model vs PCE model for keyIdx, key in enumerate(Model.Output.names): y_pce_val = self.pce_out_mean[key] y_pce_val_std = self.pce_out_std[key] y_val = self.model_out_dict[key] try: x = self.model_out_dict['x_values'][key] except IndexError: x = x_values for idx in range(y_val.shape[0]): Color = next(color) Marker = next(marker) plt.plot(x, y_val[idx], color=Color, marker=Marker, label='$Y_{%s}^M$'%(idx+1)) plt.plot(x, y_pce_val[idx], color=Color, marker=Marker, linestyle='--', label='$Y_{%s}^{PCE}$'%(idx+1)) plt.fill_between(x, y_pce_val[idx]-1.96*y_pce_val_std[idx], y_pce_val[idx]+1.96*y_pce_val_std[idx], color=Color, alpha=0.15) # Calculate the RMSE rmse = mean_squared_error(y_pce_val, y_val, squared=False) R2 = r2_score(y_pce_val[idx].reshape(-1, 1), y_val[idx].reshape(-1, 1)) plt.annotate(f'RMSE = {rmse:.3f}\n $R^2$ = {R2:.3f}', xy=(0.2, 0.75), xycoords='axes fraction') plt.ylabel(key) plt.xlabel(x_axis) plt.legend(loc='best') plt.grid() if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() # Zip the subdirectories Model.zip_subdirs(f'{Model.name}valid', f'{Model.name}valid_')
Methods
def plot_moments(self, xlabel='Time [s]', plot_type=None, save_fig=True)
-
Plots the moments in a pdf format in the directory
Outputs_PostProcessing
.Parameters
xlabel
:str
, optional- String to be displayed as x-label. The default is
'Time [s]'
. plot_type
:str
, optional- Options: bar or line. The default is
None
. save_fig
:bool
, optional- Save figure or not. The default is
True
.
Returns
pce_means
:dict
- Mean of the model outputs.
pce_means
:dict
- Standard deviation of the model outputs.
Expand source code
def plot_moments(self, xlabel='Time [s]', plot_type=None, save_fig=True): """ Plots the moments in a pdf format in the directory `Outputs_PostProcessing`. Parameters ---------- xlabel : str, optional String to be displayed as x-label. The default is `'Time [s]'`. plot_type : str, optional Options: bar or line. The default is `None`. save_fig : bool, optional Save figure or not. The default is `True`. Returns ------- pce_means: dict Mean of the model outputs. pce_means: dict Standard deviation of the model outputs. """ bar_plot = True if plot_type == 'bar' else False meta_model_type = self.MetaModel.meta_model_type Model = self.MetaModel.ModelObj # Read Monte-Carlo reference self.mc_reference = Model.read_mc_reference() print(self.mc_reference) # Set the x values x_values_orig = self.MetaModel.ExpDesign.x_values # Compute the moments with the PCEModel object self._compute_pce_moments() # Get the variables out_names = Model.Output.names # Open a pdf for the plots if save_fig: newpath = (f'Outputs_PostProcessing_{self.name}/') if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}Mean_Std_PCE.pdf') # Plot the best fit line, set the linewidth (lw), color and # transparency (alpha) of the line for idx, key in enumerate(out_names): fig, ax = plt.subplots(nrows=1, ncols=2) # Extract mean and std mean_data = self.pce_means[key] std_data = self.pce_stds[key] # Extract a list of x values if type(x_values_orig) is dict: x = x_values_orig[key] else: x = x_values_orig # Plot: bar plot or line plot if bar_plot: ax[0].bar(list(map(str, x)), mean_data, color='b', width=0.25) ax[1].bar(list(map(str, x)), std_data, color='b', width=0.25) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: ax[0].plot(x, mean_data, lw=3, color='k', marker='x', label=meta_model_type) ax[1].plot(x, std_data, lw=3, color='k', marker='x', label=meta_model_type) if self.mc_reference is not None: if bar_plot: ax[0].bar(list(map(str, x)), self.mc_reference['mean'], color='r', width=0.25) ax[1].bar(list(map(str, x)), self.mc_reference['std'], color='r', width=0.25) ax[0].legend(labels=[meta_model_type]) ax[1].legend(labels=[meta_model_type]) else: ax[0].plot(x, self.mc_reference['mean'], lw=3, marker='x', color='r', label='Ref.') ax[1].plot(x, self.mc_reference['std'], lw=3, marker='x', color='r', label='Ref.') # Label the axes and provide a title ax[0].set_xlabel(xlabel) ax[1].set_xlabel(xlabel) ax[0].set_ylabel(key) ax[1].set_ylabel(key) # Provide a title ax[0].set_title('Mean of ' + key) ax[1].set_title('Std of ' + key) if not bar_plot: ax[0].legend(loc='best') ax[1].legend(loc='best') plt.tight_layout() if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() return self.pce_means, self.pce_stds
def valid_metamodel(self, n_samples=1, samples=None, x_axis='Time [s]')
-
Evaluates and plots the meta model and the PCEModel outputs for the given number of samples or the given samples.
Parameters
n_samples
:int
, optional- Number of samples to be evaluated. The default is 1.
samples
:array
ofshape (n_samples, n_params)
, optional- Samples to be evaluated. The default is None.
x_axis
:str
, optional- Label of x axis. The default is
'Time [s]'
.
Returns
None.
Expand source code
def valid_metamodel(self, n_samples=1, samples=None, x_axis='Time [s]'): """ Evaluates and plots the meta model and the PCEModel outputs for the given number of samples or the given samples. Parameters ---------- n_samples : int, optional Number of samples to be evaluated. The default is 1. samples : array of shape (n_samples, n_params), optional Samples to be evaluated. The default is None. x_axis : str, optional Label of x axis. The default is `'Time [s]'`. Returns ------- None. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj if samples is None: self.n_samples = n_samples samples = self._get_sample() else: self.n_samples = samples.shape[0] # Extract x_values x_values = MetaModel.ExpDesign.x_values self.model_out_dict = self._eval_model(samples, key_str='valid') self.pce_out_mean, self.pce_out_std = MetaModel.eval_metamodel(samples) try: key = Model.Output.names[1] except IndexError: key = Model.Output.names[0] n_obs = self.model_out_dict[key].shape[1] if n_obs == 1: self._plot_validation() else: self._plot_validation_multi(x_values=x_values, x_axis=x_axis)
def check_accuracy(self, n_samples=None, samples=None, outputs=None)
-
Checks accuracy of the metamodel by computing the root mean square error and validation error for all outputs.
Parameters
n_samples
:int
, optional- Number of samples. The default is None.
samples
:array
ofshape (n_samples, n_params)
, optional- Parameter sets to be checked. The default is None.
outputs
:dict
, optional- Output dictionary with model outputs for all given output types in
Model.Output.names
. The default is None.
Raises
Exception
- When neither n_samples nor samples are provided.
Returns
rmse
:dict
- Root mean squared error for each output.
valid_error
:dict
- Validation error for each output.
Expand source code
def check_accuracy(self, n_samples=None, samples=None, outputs=None): """ Checks accuracy of the metamodel by computing the root mean square error and validation error for all outputs. Parameters ---------- n_samples : int, optional Number of samples. The default is None. samples : array of shape (n_samples, n_params), optional Parameter sets to be checked. The default is None. outputs : dict, optional Output dictionary with model outputs for all given output types in `Model.Output.names`. The default is None. Raises ------ Exception When neither n_samples nor samples are provided. Returns ------- rmse: dict Root mean squared error for each output. valid_error : dict Validation error for each output. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj # Set the number of samples if n_samples: self.n_samples = n_samples elif samples is not None: self.n_samples = samples.shape[0] else: raise Exception("Please provide either samples or pass number of " "samples!") # Generate random samples if necessary Samples = self._get_sample() if samples is None else samples # Run the original model with the generated samples if outputs is None: outputs = self._eval_model(Samples, key_str='validSet') # Run the PCE model with the generated samples pce_outputs, _ = MetaModel.eval_metamodel(samples=Samples) self.rmse = {} self.valid_error = {} # Loop over the keys and compute RMSE error. for key in Model.Output.names: # Root mena square self.rmse[key] = mean_squared_error(outputs[key], pce_outputs[key], squared=False, multioutput='raw_values') # Validation error self.valid_error[key] = (self.rmse[key]**2 / self.n_samples) / \ np.var(outputs[key], ddof=1, axis=0) # Print a report table print("\n>>>>> Errors of {} <<<<<".format(key)) print("\nIndex | RMSE | Validation Error") print('-'*35) print('\n'.join(f'{i+1} | {k:.3e} | {j:.3e}' for i, (k, j) in enumerate(zip(self.rmse[key], self.valid_error[key])))) # Save error dicts in PCEModel object self.MetaModel.rmse = self.rmse self.MetaModel.valid_error = self.valid_error return self.rmse, self.valid_error
def plot_seq_design_diagnostics(self, ref_BME_KLD=None, save_fig=True)
-
Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence (KLD) for the sequential design.
Parameters
ref_BME_KLD
:array
, optional- Reference BME and KLD . The default is
None
. save_fig
:bool
, optional- Whether to save the figures. The default is
True
.
Returns
None.
Expand source code
def plot_seq_design_diagnostics(self, ref_BME_KLD=None, save_fig=True): """ Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence (KLD) for the sequential design. Parameters ---------- ref_BME_KLD : array, optional Reference BME and KLD . The default is `None`. save_fig : bool, optional Whether to save the figures. The default is `True`. Returns ------- None. """ PCEModel = self.MetaModel n_init_samples = PCEModel.ExpDesign.n_init_samples n_total_samples = PCEModel.ExpDesign.X.shape[0] if save_fig: newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # create a PdfPages object pdf = PdfPages(f'./{newpath}/seqPCEModelDiagnostics.pdf') plotList = ['Modified LOO error', 'Validation error', 'KLD', 'BME', 'RMSEMean', 'RMSEStd', 'Hellinger distance'] seqList = [PCEModel.SeqModifiedLOO, PCEModel.seqValidError, PCEModel.SeqKLD, PCEModel.SeqBME, PCEModel.seqRMSEMean, PCEModel.seqRMSEStd, PCEModel.SeqDistHellinger] markers = ('x', 'o', 'd', '*', '+') colors = ('k', 'darkgreen', 'b', 'navy', 'darkred') # Plot the evolution of the diagnostic criteria of the # Sequential Experimental Design. for plotidx, plot in enumerate(plotList): fig, ax = plt.subplots() seq_dict = seqList[plotidx] name_util = list(seq_dict.keys()) if len(name_util) == 0: continue # Box plot when Replications have been detected. if any(int(name.split("rep_", 1)[1]) > 1 for name in name_util): # Extract the values from dict sorted_seq_opt = {} # Number of replications n_reps = PCEModel.ExpDesign.nReprications # Get the list of utility function names # Handle if only one UtilityFunction is provided if not isinstance(PCEModel.ExpDesign.UtilityFunction, list): util_funcs = [PCEModel.ExpDesign.UtilityFunction] else: util_funcs = PCEModel.ExpDesign.UtilityFunction for util in util_funcs: sortedSeq = {} # min number of runs available from reps n_runs = min([seq_dict[f'{util}_rep_{i+1}'].shape[0] for i in range(n_reps)]) for runIdx in range(n_runs): values = [] for key in seq_dict.keys(): if util in key: values.append(seq_dict[key][runIdx].mean()) sortedSeq['SeqItr_'+str(runIdx)] = np.array(values) sorted_seq_opt[util] = sortedSeq # BoxPlot def draw_plot(data, labels, edge_color, fill_color, idx): pos = labels - (idx-1) bp = plt.boxplot(data, positions=pos, labels=labels, patch_artist=True, sym='', widths=0.75) elements = ['boxes', 'whiskers', 'fliers', 'means', 'medians', 'caps'] for element in elements: plt.setp(bp[element], color=edge_color[idx]) for patch in bp['boxes']: patch.set(facecolor=fill_color[idx]) if PCEModel.ExpDesign.n_new_samples != 1: step1 = PCEModel.ExpDesign.n_new_samples step2 = 1 else: step1 = 5 step2 = 5 edge_color = ['red', 'blue', 'green'] fill_color = ['tan', 'cyan', 'lightgreen'] plot_label = plot # Plot for different Utility Functions for idx, util in enumerate(util_funcs): all_errors = np.empty((n_reps, 0)) for key in list(sorted_seq_opt[util].keys()): errors = sorted_seq_opt.get(util, {}).get(key)[:, None] all_errors = np.hstack((all_errors, errors)) # Special cases for BME and KLD if plot == 'KLD' or plot == 'BME': # BME convergence if refBME is provided if ref_BME_KLD is not None: if plot == 'BME': refValue = ref_BME_KLD[0] plot_label = r'$BME/BME^{Ref.}$' if plot == 'KLD': refValue = ref_BME_KLD[1] plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\ ' / D_{KL}^{Ref.}[p(\theta|y_*), '\ 'p(\theta)]$' # Difference between BME/KLD and the ref. values all_errors = np.divide(all_errors, np.full((all_errors.shape), refValue)) # Plot baseline for zero, i.e. no difference plt.axhline(y=1.0, xmin=0, xmax=1, c='green', ls='--', lw=2) # Plot each UtilFuncs labels = np.arange(n_init_samples, n_total_samples+1, step1) draw_plot(all_errors[:, ::step2], labels, edge_color, fill_color, idx) plt.xticks(labels, labels) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) ax.xaxis.grid(True, which='major', linestyle='-') ax.xaxis.grid(True, which='minor', linestyle='--') # Legend legend_elements = [] for idx, util in enumerate(util_funcs): legend_elements.append(Patch(facecolor=fill_color[idx], edgecolor=edge_color[idx], label=util)) plt.legend(handles=legend_elements[::-1], loc='best') if plot != 'BME' and plot != 'KLD': plt.yscale('log') plt.autoscale(True) plt.xlabel('\\# of training samples') plt.ylabel(plot_label) plt.title(plot) if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # Save arrays into files f = open(f'./{newpath}/Seq{plot}.txt', 'w') f.write(str(sorted_seq_opt)) f.close() else: for idx, name in enumerate(name_util): seq_values = seq_dict[name] if PCEModel.ExpDesign.n_new_samples != 1: step = PCEModel.ExpDesign.n_new_samples else: step = 1 x_idx = np.arange(n_init_samples, n_total_samples+1, step) if n_total_samples not in x_idx: x_idx = np.hstack((x_idx, n_total_samples)) if plot == 'KLD' or plot == 'BME': # BME convergence if refBME is provided if ref_BME_KLD is not None: if plot == 'BME': refValue = ref_BME_KLD[0] plot_label = r'$BME/BME^{Ref.}$' if plot == 'KLD': refValue = ref_BME_KLD[1] plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\ ' / D_{KL}^{Ref.}[p(\theta|y_*), '\ 'p(\theta)]$' # Difference between BME/KLD and the ref. values values = np.divide(seq_values, np.full((seq_values.shape), refValue)) # Plot baseline for zero, i.e. no difference plt.axhline(y=1.0, xmin=0, xmax=1, c='green', ls='--', lw=2) # Set the limits plt.ylim([1e-1, 1e1]) # Create the plots plt.semilogy(x_idx, values, marker=markers[idx], color=colors[idx], ls='--', lw=2, label=name.split("_rep", 1)[0]) else: plot_label = plot # Create the plots plt.plot(x_idx, seq_values, marker=markers[idx], color=colors[idx], ls='--', lw=2, label=name.split("_rep", 1)[0]) else: plot_label = plot seq_values = np.nan_to_num(seq_values) # Plot the error evolution for each output for i in range(seq_values.shape[1]): plt.semilogy(x_idx, seq_values[:, i], ls='--', lw=2, marker=markers[idx], color=colors[idx], alpha=0.15) plt.semilogy(x_idx, seq_values, marker=markers[idx], ls='--', lw=2, color=colors[idx], label=name.split("_rep", 1)[0]) # Set the major and minor locators ax.xaxis.set_major_locator(ticker.AutoLocator()) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator()) ax.xaxis.grid(True, which='major', linestyle='-') ax.xaxis.grid(True, which='minor', linestyle='--') ax.tick_params(axis='both', which='major', direction='in', width=3, length=10) ax.tick_params(axis='both', which='minor', direction='in', width=2, length=8) plt.xlabel('Number of runs') plt.ylabel(plot_label) plt.title(plot) plt.legend(frameon=True) if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() # ---------------- Saving arrays into files --------------- np.save(f'./{newpath}/Seq{plot}.npy', seq_values) # Close the pdf pdf.close() return
def sobol_indices(self, xlabel='Time [s]', plot_type=None, save_fig=True)
-
Provides Sobol indices as a sensitivity measure to infer the importance of the input parameters. See Eq. 27 in [1] for more details. For the case with Principal component analysis refer to [2].
[1] Global sensitivity analysis: A flexible and efficient framework with an example from stochastic hydrogeology S. Oladyshkin, F.P. de Barros, W. Nowak https://doi.org/10.1016/j.advwatres.2011.11.001
[2] Nagel, J.B., Rieckermann, J. and Sudret, B., 2020. Principal component analysis and sparse polynomial chaos expansions for global sensitivity analysis and model calibration: Application to urban drainage simulation. Reliability Engineering & System Safety, 195, p.106737.
Parameters
xlabel
:str
, optional- Label of the x-axis. The default is
'Time [s]'
. plot_type
:str
, optional- Plot type. The default is
None
. This corresponds to line plot. Bar chart can be selected bybar
. save_fig
:bool
, optional- Whether to save the figures. The default is
True
.
Returns
sobol_cell
:dict
- Sobol indices.
total_sobol
:dict
- Total Sobol indices.
Expand source code
def sobol_indices(self, xlabel='Time [s]', plot_type=None, save_fig=True): """ Provides Sobol indices as a sensitivity measure to infer the importance of the input parameters. See Eq. 27 in [1] for more details. For the case with Principal component analysis refer to [2]. [1] Global sensitivity analysis: A flexible and efficient framework with an example from stochastic hydrogeology S. Oladyshkin, F.P. de Barros, W. Nowak https://doi.org/10.1016/j.advwatres.2011.11.001 [2] Nagel, J.B., Rieckermann, J. and Sudret, B., 2020. Principal component analysis and sparse polynomial chaos expansions for global sensitivity analysis and model calibration: Application to urban drainage simulation. Reliability Engineering & System Safety, 195, p.106737. Parameters ---------- xlabel : str, optional Label of the x-axis. The default is `'Time [s]'`. plot_type : str, optional Plot type. The default is `None`. This corresponds to line plot. Bar chart can be selected by `bar`. save_fig : bool, optional Whether to save the figures. The default is `True`. Returns ------- sobol_cell: dict Sobol indices. total_sobol: dict Total Sobol indices. """ # Extract the necessary variables PCEModel = self.MetaModel basis_dict = PCEModel.basis_dict coeffs_dict = PCEModel.coeffs_dict n_params = PCEModel.n_params max_order = np.max(PCEModel.pce_deg) self.sobol_cell = {} self.total_sobol = {} for Output in PCEModel.ModelObj.Output.names: n_meas_points = len(coeffs_dict[Output]) # Initialize the (cell) array containing the (total) Sobol indices. sobol_array = dict.fromkeys(range(1, max_order+1), []) sobol_cell_array = dict.fromkeys(range(1, max_order+1), []) for i_order in range(1, max_order+1): n_comb = math.comb(n_params, i_order) sobol_cell_array[i_order] = np.zeros((n_comb, n_meas_points)) total_sobol_array = np.zeros((n_params, n_meas_points)) # Initialize the cell to store the names of the variables TotalVariance = np.zeros((n_meas_points)) # Loop over all measurement points and calculate sobol indices for pIdx in range(n_meas_points): # Extract the basis indices (alpha) and coefficients Basis = basis_dict[Output][f'y_{pIdx+1}'] try: clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+1}'] PCECoeffs = clf_poly.coef_ except: PCECoeffs = coeffs_dict[Output][f'y_{pIdx+1}'] # Compute total variance TotalVariance[pIdx] = np.sum(np.square(PCECoeffs[1:])) nzidx = np.where(PCECoeffs != 0)[0] # Set all the Sobol indices equal to zero in the presence of a # null output. if len(nzidx) == 0: # This is buggy. for i_order in range(1, max_order+1): sobol_cell_array[i_order][:, pIdx] = 0 # Otherwise compute them by summing well-chosen coefficients else: nz_basis = Basis[nzidx] for i_order in range(1, max_order+1): idx = np.where(np.sum(nz_basis > 0, axis=1) == i_order) subbasis = nz_basis[idx] Z = np.array(list(combinations(range(n_params), i_order))) for q in range(Z.shape[0]): Zq = Z[q] subsubbasis = subbasis[:, Zq] subidx = np.prod(subsubbasis, axis=1) > 0 sum_ind = nzidx[idx[0][subidx]] if TotalVariance[pIdx] == 0.0: sobol_cell_array[i_order][q, pIdx] = 0.0 else: sobol = np.sum(np.square(PCECoeffs[sum_ind])) sobol /= TotalVariance[pIdx] sobol_cell_array[i_order][q, pIdx] = sobol # Compute the TOTAL Sobol indices. for ParIdx in range(n_params): idx = nz_basis[:, ParIdx] > 0 sum_ind = nzidx[idx] if TotalVariance[pIdx] == 0.0: total_sobol_array[ParIdx, pIdx] = 0.0 else: sobol = np.sum(np.square(PCECoeffs[sum_ind])) sobol /= TotalVariance[pIdx] total_sobol_array[ParIdx, pIdx] = sobol # ----- if PCA selected: Compute covariance ----- if PCEModel.dim_red_method.lower() == 'pca': cov_Z_p_q = np.zeros((n_params)) # Extract the basis indices (alpha) and coefficients for # next component if pIdx < n_meas_points-1: nextBasis = basis_dict[Output][f'y_{pIdx+2}'] try: clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+2}'] nextPCECoeffs = clf_poly.coef_ except: nextPCECoeffs = coeffs_dict[Output][f'y_{pIdx+2}'] # Choose the common non-zero basis mask = (Basis[:, None] == nextBasis).all(-1).any(-1) similar_basis = Basis[mask] # Compute the TOTAL Sobol indices. for ParIdx in range(n_params): idx = similar_basis[:, ParIdx] > 0 try: sum_is = nzidx[idx] cov_Z_p_q[ParIdx] = np.sum(PCECoeffs[sum_ind] * nextPCECoeffs[sum_is]) except: cov_Z_p_q[ParIdx] = 0.0 # Compute the sobol indices according to Ref. 2 if PCEModel.dim_red_method.lower() == 'pca': n_c_points = PCEModel.ExpDesign.Y[Output].shape[1] PCA = PCEModel.pca[Output] compPCA = PCA.components_ nComp = compPCA.shape[0] var_Z_p = PCA.explained_variance_ # Extract the sobol index of the components for i_order in range(1, max_order+1): n_comb = math.comb(n_params, i_order) sobol_array[i_order] = np.zeros((n_comb, n_c_points)) Z = np.array(list(combinations(range(n_params), i_order))) for q in range(Z.shape[0]): S_Z_i = sobol_cell_array[i_order][q] for tIdx in range(n_c_points): var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx]) if var_Y_t == 0.0: term1, term2 = 0.0, 0.0 else: term1 = np.sum([S_Z_i[i]*(var_Z_p[i]*(compPCA[i, tIdx]**2)/var_Y_t) for i in range(nComp)]) # Term 2 # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs Phi_t_p = compPCA[:nComp-1] Phi_t_q = compPCA term2 = 2 * np.sum([cov_Z_p_q[ParIdx] * Phi_t_p[i,tIdx] * Phi_t_q[i,tIdx]/var_Y_t for i in range(nComp-1)]) sobol_array[i_order][q, tIdx] = term1 #+ term2 # Compute the TOTAL Sobol indices. total_sobol = np.zeros((n_params, n_c_points)) for ParIdx in range(n_params): S_Z_i = total_sobol_array[ParIdx] for tIdx in range(n_c_points): var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx]) if var_Y_t == 0.0: term1, term2 = 0.0, 0.0 else: term1 = 0 for i in range(nComp): term1 += S_Z_i[i] * var_Z_p[i] * \ (compPCA[i, tIdx]**2) / var_Y_t # Term 2 # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs Phi_t_p = compPCA[:nComp-1] Phi_t_q = compPCA term2 = 0 for i in range(nComp-1): term2 += cov_Z_p_q[ParIdx] * Phi_t_p[i, tIdx] \ * Phi_t_q[i, tIdx] / var_Y_t term2 *= 2 total_sobol[ParIdx, tIdx] = term1 + term2 self.sobol_cell[Output] = sobol_array self.total_sobol[Output] = total_sobol else: self.sobol_cell[Output] = sobol_cell_array self.total_sobol[Output] = total_sobol_array # ---------------- Plot ----------------------- par_names = PCEModel.ExpDesign.par_names x_values_orig = PCEModel.ExpDesign.x_values cases = [''] for case in cases: newpath = (f'Outputs_PostProcessing_{self.name}/') if not os.path.exists(newpath): os.makedirs(newpath) if save_fig: # create a PdfPages object name = case+'_' if 'Valid' in cases else '' pdf = PdfPages('./'+newpath+name+'Sobol_indices.pdf') fig = plt.figure() for outIdx, Output in enumerate(PCEModel.ModelObj.Output.names): # Extract total Sobol indices total_sobol = self.total_sobol[Output] # Extract a list of x values if type(x_values_orig) is dict: x = x_values_orig[Output] else: x = x_values_orig if plot_type == 'bar': ax = fig.add_axes([0, 0, 1, 1]) dict1 = {xlabel: x} dict2 = {param: sobolIndices for param, sobolIndices in zip(par_names, total_sobol)} df = pd.DataFrame({**dict1, **dict2}) df.plot(x=xlabel, y=par_names, kind="bar", ax=ax, rot=0, colormap='Dark2') ax.set_ylabel('Total Sobol indices, $S^T$') else: for i, sobolIndices in enumerate(total_sobol): plt.plot(x, sobolIndices, label=par_names[i], marker='x', lw=2.5) plt.ylabel('Total Sobol indices, $S^T$') plt.xlabel(xlabel) plt.title(f'Sensitivity analysis of {Output}') if plot_type != 'bar': plt.legend(loc='best', frameon=True) # Save indices np.savetxt(f'./{newpath}{name}totalsobol_' + Output.replace('/', '_') + '.csv', total_sobol.T, delimiter=',', header=','.join(par_names), comments='') if save_fig: # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() return self.sobol_cell, self.total_sobol
def check_reg_quality(self, n_samples=1000, samples=None, save_fig=True)
-
Checks the quality of the metamodel for single output models based on: https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685
Parameters
n_samples
:int
, optional- Number of parameter sets to use for the check. The default is 1000.
samples
:array
ofshape (n_samples, n_params)
, optional- Parameter sets to use for the check. The default is None.
save_fig
:bool
, optional- Whether to save the figures. The default is True.
Returns
None.
Expand source code
def check_reg_quality(self, n_samples=1000, samples=None, save_fig=True): """ Checks the quality of the metamodel for single output models based on: https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685 Parameters ---------- n_samples : int, optional Number of parameter sets to use for the check. The default is 1000. samples : array of shape (n_samples, n_params), optional Parameter sets to use for the check. The default is None. save_fig : bool, optional Whether to save the figures. The default is True. Returns ------- None. """ MetaModel = self.MetaModel if samples is None: self.n_samples = n_samples samples = self._get_sample() else: self.n_samples = samples.shape[0] # Evaluate the original and the surrogate model y_val = self._eval_model(samples, key_str='valid') y_pce_val, _ = MetaModel.eval_metamodel(samples=samples) # Open a pdf for the plots if save_fig: newpath = (r'Outputs_PostProcessing_{0}/'.format(self.name)) if not os.path.exists(newpath): os.makedirs(newpath) # Fit the data(train the model) for key in y_pce_val.keys(): y_pce_val_ = y_pce_val[key] y_val_ = y_val[key] # ------ Residuals vs. predicting variables ------ # Check the assumptions of linearity and independence fig1 = plt.figure() plt.title(key+": Residuals vs. Predicting variables") residuals = y_val_ - y_pce_val_ plt.scatter(x=y_val_, y=residuals, color='blue', edgecolor='k') plt.grid(True) xmin, xmax = min(y_val_), max(y_val_) plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3, linestyle='--') plt.xlabel(key) plt.ylabel('Residuals') plt.show() if save_fig: # save the current figure fig1.savefig(f'./{newpath}/Residuals_vs_PredVariables.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Fitted vs. residuals ------ # Check the assumptions of linearity and independence fig2 = plt.figure() plt.title(key+": Residuals vs. predicting variables") plt.scatter(x=y_pce_val_, y=residuals, color='blue', edgecolor='k') plt.grid(True) xmin, xmax = min(y_val_), max(y_val_) plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3, linestyle='--') plt.xlabel(key) plt.ylabel('Residuals') plt.show() if save_fig: # save the current figure fig2.savefig(f'./{newpath}/Fitted_vs_Residuals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Histogram of normalized residuals ------ fig3 = plt.figure() resid_pearson = residuals / (max(residuals)-min(residuals)) plt.hist(resid_pearson, bins=20, edgecolor='k') plt.ylabel('Count') plt.xlabel('Normalized residuals') plt.title(f"{key}: Histogram of normalized residuals") # Normality (Shapiro-Wilk) test of the residuals ax = plt.gca() _, p = stats.shapiro(residuals) if p < 0.01: annText = "The residuals seem to come from Gaussian process." else: annText = "The normality assumption may not hold." at = AnchoredText(annText, prop=dict(size=30), frameon=True, loc='upper left') at.patch.set_boxstyle("round,pad=0.,rounding_size=0.2") ax.add_artist(at) plt.show() if save_fig: # save the current figure fig3.savefig(f'./{newpath}/Hist_NormResiduals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf() # ------ Q-Q plot of the normalized residuals ------ plt.figure() fig4 = qqplot(resid_pearson, line='45', fit='True') plt.xticks() plt.yticks() plt.xlabel("Theoretical quantiles") plt.ylabel("Sample quantiles") plt.title(key+": Q-Q plot of normalized residuals") plt.grid(True) plt.show() if save_fig: # save the current figure fig4.savefig(f'./{newpath}/QQPlot_NormResiduals.pdf', bbox_inches='tight') # Destroy the current plot plt.clf()
def eval_pce_model_3d(self, save_fig=True)
-
Expand source code
def eval_pce_model_3d(self, save_fig=True): self.n_samples = 1000 PCEModel = self.MetaModel Model = self.MetaModel.ModelObj n_samples = self.n_samples # Create 3D-Grid # TODO: Make it general x = np.linspace(-5, 10, n_samples) y = np.linspace(0, 15, n_samples) X, Y = np.meshgrid(x, y) PCE_Z = np.zeros((self.n_samples, self.n_samples)) Model_Z = np.zeros((self.n_samples, self.n_samples)) for idxMesh in range(self.n_samples): sample_mesh = np.vstack((X[:, idxMesh], Y[:, idxMesh])).T univ_p_val = PCEModel.univ_basis_vals(sample_mesh) for Outkey, ValuesDict in PCEModel.coeffs_dict.items(): pce_out_mean = np.zeros((len(sample_mesh), len(ValuesDict))) pce_out_std = np.zeros((len(sample_mesh), len(ValuesDict))) model_outs = np.zeros((len(sample_mesh), len(ValuesDict))) for Inkey, InIdxValues in ValuesDict.items(): idx = int(Inkey.split('_')[1]) - 1 basis_deg_ind = PCEModel.basis_dict[Outkey][Inkey] clf_poly = PCEModel.clf_poly[Outkey][Inkey] PSI_Val = PCEModel.create_psi(basis_deg_ind, univ_p_val) # Perdiction with error bar y_mean, y_std = clf_poly.predict(PSI_Val, return_std=True) pce_out_mean[:, idx] = y_mean pce_out_std[:, idx] = y_std # Model evaluation model_out_dict, _ = Model.run_model_parallel(sample_mesh, key_str='Valid3D') model_outs[:, idx] = model_out_dict[Outkey].T PCE_Z[:, idxMesh] = y_mean Model_Z[:, idxMesh] = model_outs[:, 0] # ---------------- 3D plot for PCEModel ----------------------- fig_PCE = plt.figure() ax = plt.axes(projection='3d') ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1, cmap='viridis', edgecolor='none') ax.set_title('PCEModel') ax.set_xlabel('$x_1$') ax.set_ylabel('$x_2$') ax.set_zlabel('$f(x_1,x_2)$') plt.grid() plt.show() if save_fig: # Saving the figure newpath = f'Outputs_PostProcessing_{self.name}/' if not os.path.exists(newpath): os.makedirs(newpath) # save the figure to file fig_PCE.savefig(f'./{newpath}/3DPlot_PCEModel.pdf', format="pdf", bbox_inches='tight') plt.close(fig_PCE) # ---------------- 3D plot for Model ----------------------- fig_Model = plt.figure() ax = plt.axes(projection='3d') ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1, cmap='viridis', edgecolor='none') ax.set_title('Model') ax.set_xlabel('$x_1$') ax.set_ylabel('$x_2$') ax.set_zlabel('$f(x_1,x_2)$') plt.grid() plt.show() if save_fig: # Save the figure fig_Model.savefig(f'./{newpath}/3DPlot_Model.pdf', format="pdf", bbox_inches='tight') plt.close(fig_Model) return
class BayesInference (MetaModel, discrepancy=None, emulator=True, name='Calib', bootstrap=False, req_outputs=None, selected_indices=None, samples=None, n_samples=500000, measured_data=None, inference_method='rejection', mcmc_params=None, bayes_loocv=False, n_bootstrap_itrs=1, perturbed_data=[], bootstrap_noise=0.05, plot_post_pred=True, plot_map_pred=False, max_a_posteriori='mean', corner_title_fmt='.3e')
-
A class to perform Bayesian Analysis.
Attributes
MetaModel
:obj
- Meta model object.
discrepancy
:obj
- The discrepancy object for the sigma2s, i.e. the diagonal entries of the variance matrix for a multivariate normal likelihood.
name
:str
, optional- The type of analysis, either calibration (
Calib
) or validation (Valid
). The default is'Calib'
. emulator
:bool
, optional- Analysis with emulator (MetaModel). The default is
True
. bootstrap
:bool
, optional- Bootstrap the analysis. The default is
False
. req_outputs
:list
, optional- The list of requested output to be used for the analysis.
The default is
None
. If None, all the defined outputs for the model object is used. selected_indices
:dict
, optional- A dictionary with the selected indices of each model output. The
default is
None
. IfNone
, all measurement points are used in the analysis. samples
:array
ofshape (n_samples, n_params)
, optional- The samples to be used in the analysis. The default is
None
. If None the samples are drawn from the probablistic input parameter object of the MetaModel object. n_samples
:int
, optional- Number of samples to be used in the analysis. The default is
500000
. If samples is notNone
, this argument will be assigned based on the number of samples given. measured_data
:dict
, optional- A dictionary containing the observation data. The default is
None
. ifNone
, the observation defined in the Model object of the MetaModel is used. inference_method
:str
, optional- A method for approximating the posterior distribution in the Bayesian
inference step. The default is
'rejection'
, which stands for rejection sampling. A Markov Chain Monte Carlo sampler can be simply selected by passing'MCMC'
. mcmc_params
:dict
, optional-
A dictionary with args required for the Bayesian inference with
MCMC
. The default isNone
.Pass the mcmc_params like the following:
>>> mcmc_params:{ 'init_samples': None, # initial samples 'n_walkers': 100, # number of walkers (chain) 'n_steps': 100000, # number of maximum steps 'n_burn': 200, # number of burn-in steps 'moves': None, # Moves for the emcee sampler 'multiprocessing': False, # multiprocessing 'verbose': False # verbosity }
The items shown above are the default values. If any parmeter is not defined, the default value will be assigned to it.
bayes_loocv
:bool
, optional- Bayesian Leave-one-out Cross Validation. The default is
False
. IfTrue
, the LOOCV procedure is used to estimate the bayesian Model Evidence (BME). n_bootstrap_itrs
:int
, optional- Number of bootstrap iteration. The default is
1
. If bayes_loocv isTrue
, this is qualt to the total length of the observation data set. perturbed_data
:array
ofshape (n_bootstrap_itrs, n_obs)
, optional- User defined perturbed data. The default is
[]
. bootstrap_noise
:float
, optional- A noise level to perturb the data set. The default is
0.05
. plot_post_pred
:bool
, optional- Plot posterior predictive plots. The default is
True
. plot_map_pred
:bool
, optional- Plot the model outputs vs the metamodel predictions for the maximum
a posteriori (defined as
max_a_posteriori
) parameter set. The default isFalse
. max_a_posteriori
:str
, optional- Maximum a posteriori.
'mean'
and'mode'
are available. The default is'mean'
. corner_title_fmt
:str
, optional- Title format for the posterior distribution plot with python
package
corner
. The default is'.3e'
.
Expand source code
class BayesInference: """ A class to perform Bayesian Analysis. Attributes ---------- MetaModel : obj Meta model object. discrepancy : obj The discrepancy object for the sigma2s, i.e. the diagonal entries of the variance matrix for a multivariate normal likelihood. name : str, optional The type of analysis, either calibration (`Calib`) or validation (`Valid`). The default is `'Calib'`. emulator : bool, optional Analysis with emulator (MetaModel). The default is `True`. bootstrap : bool, optional Bootstrap the analysis. The default is `False`. req_outputs : list, optional The list of requested output to be used for the analysis. The default is `None`. If None, all the defined outputs for the model object is used. selected_indices : dict, optional A dictionary with the selected indices of each model output. The default is `None`. If `None`, all measurement points are used in the analysis. samples : array of shape (n_samples, n_params), optional The samples to be used in the analysis. The default is `None`. If None the samples are drawn from the probablistic input parameter object of the MetaModel object. n_samples : int, optional Number of samples to be used in the analysis. The default is `500000`. If samples is not `None`, this argument will be assigned based on the number of samples given. measured_data : dict, optional A dictionary containing the observation data. The default is `None`. if `None`, the observation defined in the Model object of the MetaModel is used. inference_method : str, optional A method for approximating the posterior distribution in the Bayesian inference step. The default is `'rejection'`, which stands for rejection sampling. A Markov Chain Monte Carlo sampler can be simply selected by passing `'MCMC'`. mcmc_params : dict, optional A dictionary with args required for the Bayesian inference with `MCMC`. The default is `None`. Pass the mcmc_params like the following: >>> mcmc_params:{ 'init_samples': None, # initial samples 'n_walkers': 100, # number of walkers (chain) 'n_steps': 100000, # number of maximum steps 'n_burn': 200, # number of burn-in steps 'moves': None, # Moves for the emcee sampler 'multiprocessing': False, # multiprocessing 'verbose': False # verbosity } The items shown above are the default values. If any parmeter is not defined, the default value will be assigned to it. bayes_loocv : bool, optional Bayesian Leave-one-out Cross Validation. The default is `False`. If `True`, the LOOCV procedure is used to estimate the bayesian Model Evidence (BME). n_bootstrap_itrs : int, optional Number of bootstrap iteration. The default is `1`. If bayes_loocv is `True`, this is qualt to the total length of the observation data set. perturbed_data : array of shape (n_bootstrap_itrs, n_obs), optional User defined perturbed data. The default is `[]`. bootstrap_noise : float, optional A noise level to perturb the data set. The default is `0.05`. plot_post_pred : bool, optional Plot posterior predictive plots. The default is `True`. plot_map_pred : bool, optional Plot the model outputs vs the metamodel predictions for the maximum a posteriori (defined as `max_a_posteriori`) parameter set. The default is `False`. max_a_posteriori : str, optional Maximum a posteriori. `'mean'` and `'mode'` are available. The default is `'mean'`. corner_title_fmt : str, optional Title format for the posterior distribution plot with python package `corner`. The default is `'.3e'`. """ def __init__(self, MetaModel, discrepancy=None, emulator=True, name='Calib', bootstrap=False, req_outputs=None, selected_indices=None, samples=None, n_samples=500000, measured_data=None, inference_method='rejection', mcmc_params=None, bayes_loocv=False, n_bootstrap_itrs=1, perturbed_data=[], bootstrap_noise=0.05, plot_post_pred=True, plot_map_pred=False, max_a_posteriori='mean', corner_title_fmt='.3e'): self.MetaModel = MetaModel self.Discrepancy = discrepancy self.emulator = emulator self.name = name self.bootstrap = bootstrap self.req_outputs = req_outputs self.selected_indices = selected_indices self.samples = samples self.n_samples = n_samples self.measured_data = measured_data self.inference_method = inference_method self.mcmc_params = mcmc_params self.perturbed_data = perturbed_data self.bayes_loocv = bayes_loocv self.n_bootstrap_itrs = n_bootstrap_itrs self.bootstrap_noise = bootstrap_noise self.plot_post_pred = plot_post_pred self.plot_map_pred = plot_map_pred self.max_a_posteriori = max_a_posteriori self.corner_title_fmt = corner_title_fmt # ------------------------------------------------------------------------- def create_inference(self): """ Starts the inference. Returns ------- BayesInference : obj The Bayes inference object. """ # Set some variables MetaModel = self.MetaModel Model = MetaModel.ModelObj n_params = MetaModel.n_params output_names = Model.Output.names par_names = MetaModel.ExpDesign.par_names # If the prior is set by the user, take it. if self.samples is None: self.samples = MetaModel.ExpDesign.generate_samples( self.n_samples, 'random') else: try: samples = self.samples.values except AttributeError: samples = self.samples # Take care of an additional Sigma2s self.samples = samples[:, :n_params] # Update number of samples self.n_samples = self.samples.shape[0] # ---------- Preparation of observation data ---------- # Read observation data and perturb it if requested. if self.measured_data is None: self.measured_data = Model.read_observation(case=self.name) # Convert measured_data to a data frame if not isinstance(self.measured_data, pd.DataFrame): self.measured_data = pd.DataFrame(self.measured_data) # Extract the total number of measurement points if self.name.lower() == 'calib': self.n_tot_measurement = Model.n_obs else: self.n_tot_measurement = Model.n_obs_valid # Find measurement error (if not given) for post predictive plot if not hasattr(self, 'measurement_error'): if isinstance(self.Discrepancy, dict): Disc = self.Discrepancy['known'] else: Disc = self.Discrepancy if isinstance(Disc.parameters, dict): self.measurement_error = {k: np.sqrt(Disc.parameters[k]) for k in Disc.parameters.keys()} else: try: self.measurement_error = np.sqrt(Disc.parameters) except TypeError: pass # ---------- Preparation of variance for covariance matrix ---------- # Independent and identically distributed total_sigma2 = dict() opt_sigma_flag = isinstance(self.Discrepancy, dict) opt_sigma = None for key_idx, key in enumerate(output_names): # Find opt_sigma if opt_sigma_flag and opt_sigma is None: # Option A: known error with unknown bias term opt_sigma = 'A' known_discrepancy = self.Discrepancy['known'] self.Discrepancy = self.Discrepancy['infer'] sigma2 = np.array(known_discrepancy.parameters[key]) elif opt_sigma == 'A' or self.Discrepancy.parameters is not None: # Option B: The sigma2 is known (no bias term) if opt_sigma == 'A': sigma2 = np.array(known_discrepancy.parameters[key]) else: opt_sigma = 'B' sigma2 = np.array(self.Discrepancy.parameters[key]) elif not isinstance(self.Discrepancy.InputDisc, str): # Option C: The sigma2 is unknown (bias term including error) opt_sigma = 'C' self.Discrepancy.opt_sigma = opt_sigma n_measurement = self.measured_data[key].values.shape sigma2 = np.zeros((n_measurement[0])) total_sigma2[key] = sigma2 self.Discrepancy.opt_sigma = opt_sigma self.Discrepancy.total_sigma2 = total_sigma2 # If inferred sigma2s obtained from e.g. calibration are given try: self.sigma2s = self.Discrepancy.get_sample(self.n_samples) except: pass # ---------------- Bootstrap & TOM -------------------- if self.bootstrap or self.bayes_loocv: if len(self.perturbed_data) == 0: # zero mean noise Adding some noise to the observation function self.perturbed_data = self._perturb_data( self.measured_data, output_names ) else: self.n_bootstrap_itrs = len(self.perturbed_data) # -------- Model Discrepancy ----------- if hasattr(self, 'error_model') and self.error_model \ and self.name.lower() != 'calib': # Select posterior mean as MAP MAP_theta = self.samples.mean(axis=0).reshape((1, n_params)) # MAP_theta = stats.mode(self.samples,axis=0)[0] # Evaluate the (meta-)model at the MAP y_MAP, y_std_MAP = MetaModel.eval_metamodel(samples=MAP_theta) # Train a GPR meta-model using MAP self.error_MetaModel = MetaModel.create_model_error( self.bias_inputs, y_MAP, Name=self.name ) # ----------------------------------------------------- # ----- Loop over the perturbed observation data ------ # ----------------------------------------------------- # Initilize arrays logLikelihoods = np.zeros((self.n_samples, self.n_bootstrap_itrs), dtype=np.float16) BME_Corr = np.zeros((self.n_bootstrap_itrs)) log_BME = np.zeros((self.n_bootstrap_itrs)) KLD = np.zeros((self.n_bootstrap_itrs)) inf_entropy = np.zeros((self.n_bootstrap_itrs)) # Compute the prior predtions # Evaluate the MetaModel if self.emulator: y_hat, y_std = MetaModel.eval_metamodel(samples=self.samples) self.__mean_pce_prior_pred = y_hat self._std_pce_prior_pred = y_std # Correct the predictions with Model discrepancy if hasattr(self, 'error_model') and self.error_model: y_hat_corr, y_std = self.error_MetaModel.eval_model_error( self.bias_inputs, self.__mean_pce_prior_pred ) self.__mean_pce_prior_pred = y_hat_corr self._std_pce_prior_pred = y_std # Surrogate model's error using RMSE of test data if hasattr(MetaModel, 'rmse'): surrError = MetaModel.rmse else: surrError = None else: # Evaluate the original model self.__model_prior_pred = self._eval_model( samples=self.samples, key='PriorPred' ) # Start the likelihood-BME computations for the perturbed data for itr_idx, data in tqdm( enumerate(self.perturbed_data), ascii=True, desc="Boostraping the BME calculations" ): # ---------------- Likelihood calculation ---------------- if self.emulator: model_evals = self.__mean_pce_prior_pred else: model_evals = self.__model_prior_pred # Leave one out if self.bayes_loocv: self.selected_indices = np.nonzero(data)[0] # Prepare data dataframe nobs = list(self.measured_data.count().values[1:]) numbers = list(map(sum, zip([0] + nobs, nobs))) indices = list(zip([0] + numbers, numbers)) data_dict = { output_names[i]: data[j:k] for i, (j, k) in enumerate(indices) } # Unknown sigma2 if opt_sigma == 'C' or hasattr(self, 'sigma2s'): logLikelihoods[:, itr_idx] = self.normpdf( model_evals, data_dict, total_sigma2, sigma2=self.sigma2s, std=surrError ) else: # known sigma2 logLikelihoods[:, itr_idx] = self.normpdf( model_evals, data_dict, total_sigma2, std=surrError ) # ---------------- BME Calculations ---------------- # BME (log) log_BME[itr_idx] = np.log( np.nanmean(np.exp(logLikelihoods[:, itr_idx], dtype=np.float128)) ) # Rejection Step # Random numbers between 0 and 1 unif = np.random.rand(1, self.n_samples)[0] # Reject the poorly performed prior Likelihoods = np.exp(logLikelihoods[:, itr_idx], dtype=np.float64) accepted = (Likelihoods/np.max(Likelihoods)) >= unif posterior = self.samples[accepted] # Posterior-based expectation of likelihoods postExpLikelihoods = np.mean( logLikelihoods[:, itr_idx][accepted] ) # Posterior-based expectation of prior densities postExpPrior = np.mean( np.log([MetaModel.ExpDesign.JDist.pdf(posterior.T)]) ) # Calculate Kullback-Leibler Divergence KLD[itr_idx] = postExpLikelihoods - log_BME[itr_idx] # Information Entropy based on Entropy paper Eq. 38 inf_entropy[itr_idx] = log_BME[itr_idx] - postExpPrior - \ postExpLikelihoods # TODO: BME correction when using Emulator # if self.emulator: # BME_Corr[itr_idx] = self._corr_factor_BME( # data, total_sigma2, posterior # ) # Clear memory gc.collect(generation=2) # ---------------- Store BME, Likelihoods for all ---------------- # Likelihoods (Size: n_samples, n_bootstrap_itr) self.log_likes = logLikelihoods # BME (log), KLD, infEntropy (Size: 1,n_bootstrap_itr) self.log_BME = log_BME self.KLD = KLD self.inf_entropy = inf_entropy # TODO: BMECorrFactor (log) (Size: 1,n_bootstrap_itr) # if self.emulator: self.BMECorrFactor = BME_Corr # BME = BME + BMECorrFactor if self.emulator: self.log_BME = self.log_BME # + self.BMECorrFactor # ---------------- Parameter Bayesian inference ---------------- if self.inference_method.lower() == 'mcmc': # Instantiate the MCMC object MCMC_Obj = MCMC(self) self.posterior_df = MCMC_Obj.run_sampler( self.measured_data, total_sigma2 ) elif self.name.lower() == 'valid': # Convert to a dataframe if samples are provided after calibration. self.posterior_df = pd.DataFrame(self.samples, columns=par_names) else: # Rejection sampling self.posterior_df = self._rejection_sampling() # Provide posterior's summary print('\n') print('-'*15 + 'Posterior summary' + '-'*15) pd.options.display.max_columns = None pd.options.display.max_rows = None print(self.posterior_df.describe()) print('-'*50) # -------- Model Discrepancy ----------- if hasattr(self, 'error_model') and self.error_model \ and self.name.lower() == 'calib': if self.inference_method.lower() == 'mcmc': self.error_MetaModel = MCMC_Obj.error_MetaModel else: # Select posterior mean as MAP if opt_sigma == "B": posterior_df = self.posterior_df.values else: posterior_df = self.posterior_df.values[:, :-Model.n_outputs] # Select posterior mean as Maximum a posteriori map_theta = posterior_df.mean(axis=0).reshape((1, n_params)) # map_theta = stats.mode(Posterior_df,axis=0)[0] # Evaluate the (meta-)model at the MAP y_MAP, y_std_MAP = MetaModel.eval_metamodel(samples=map_theta) # Train a GPR meta-model using MAP self.error_MetaModel = MetaModel.create_model_error( self.bias_inputs, y_MAP, Name=self.name ) # -------- Posterior perdictive ----------- self._posterior_predictive() # ----------------------------------------------------- # ------------------ Visualization -------------------- # ----------------------------------------------------- # Create Output directory, if it doesn't exist already. out_dir = f'Outputs_Bayes_{Model.name}_{self.name}' os.makedirs(out_dir, exist_ok=True) # -------- Posteior parameters -------- if opt_sigma != "B": par_names.extend( [self.Discrepancy.InputDisc.Marginals[i].name for i in range(len(self.Discrepancy.InputDisc.Marginals))] ) # Pot with corner figPosterior = corner.corner(self.posterior_df.to_numpy(), labels=par_names, quantiles=[0.15, 0.5, 0.85], show_titles=True, title_fmt=self.corner_title_fmt, labelpad=0.2, use_math_text=True, title_kwargs={"fontsize": 28}, plot_datapoints=False, plot_density=False, fill_contours=True, smooth=0.5, smooth1d=0.5) # Loop over axes and set x limits if opt_sigma == "B": axes = np.array(figPosterior.axes).reshape( (len(par_names), len(par_names)) ) for yi in range(len(par_names)): ax = axes[yi, yi] ax.set_xlim(MetaModel.bound_tuples[yi]) for xi in range(yi): ax = axes[yi, xi] ax.set_xlim(MetaModel.bound_tuples[xi]) # Turn off gridlines for ax in figPosterior.axes: ax.grid(False) if self.emulator: plotname = f'/Posterior_Dist_{Model.name}_emulator' else: plotname = f'/Posterior_Dist_{Model.name}' figPosterior.set_size_inches((24, 16)) figPosterior.savefig(f'./{out_dir}{plotname}.pdf', bbox_inches='tight') # -------- Plot MAP -------- if self.plot_map_pred: self._plot_max_a_posteriori() # -------- Plot log_BME dist -------- if self.bootstrap and self.n_bootstrap_itrs > 1: # Computing the TOM performance self.log_BME_tom = stats.chi2.rvs( self.n_tot_measurement, size=self.log_BME.shape[0] ) fig, ax = plt.subplots() sns.kdeplot(self.log_BME_tom, ax=ax, color="green", shade=True) sns.kdeplot( self.log_BME, ax=ax, color="blue", shade=True, label='Model BME') ax.set_xlabel('log$_{10}$(BME)') ax.set_ylabel('Probability density') legend_elements = [ Patch(facecolor='green', edgecolor='green', label='TOM BME'), Patch(facecolor='blue', edgecolor='blue', label='Model BME') ] ax.legend(handles=legend_elements) if self.emulator: plotname = f'/BME_hist_{Model.name}_emulator' else: plotname = f'/BME_hist_{Model.name}' plt.savefig(f'./{out_dir}{plotname}.pdf', bbox_inches='tight') plt.show() plt.close() # -------- Posteior perdictives -------- if self.plot_post_pred: # Plot the posterior predictive self._plot_post_predictive() return self # ------------------------------------------------------------------------- def _perturb_data(self, data, output_names): """ Returns an array with n_bootstrap_itrs rowsof perturbed data. The first row includes the original observation data. If `self.bayes_loocv` is True, a 2d-array will be returned with repeated rows and zero diagonal entries. Parameters ---------- data : pandas DataFrame Observation data. output_names : list List of the output names. Returns ------- final_data : array Perturbed data set. """ noise_level = self.bootstrap_noise obs_data = data[output_names].values n_measurement, n_outs = obs_data.shape self.n_tot_measurement = obs_data[~np.isnan(obs_data)].shape[0] # Number of bootstrap iterations if self.bayes_loocv: self.n_bootstrap_itrs = self.n_tot_measurement # Pass loocv dataset if self.bayes_loocv: obs = obs_data.T[~np.isnan(obs_data.T)] final_data = np.repeat(np.atleast_2d(obs), self.n_bootstrap_itrs, axis=0) np.fill_diagonal(final_data, 0) return final_data else: final_data = np.zeros( (self.n_bootstrap_itrs, self.n_tot_measurement) ) final_data[0] = obs_data.T[~np.isnan(obs_data.T)] for itrIdx in range(1, self.n_bootstrap_itrs): data = np.zeros((n_measurement, n_outs)) for idx in range(len(output_names)): std = np.nanstd(obs_data[:, idx]) if std == 0: std = 0.001 noise = std * noise_level data[:, idx] = np.add( obs_data[:, idx], np.random.normal(0, 1, obs_data.shape[0]) * noise, ) final_data[itrIdx] = data.T[~np.isnan(data.T)] return final_data # ------------------------------------------------------------------------- def _logpdf(self, x, mean, cov): """ computes the likelihood based on a multivariate normal distribution. Parameters ---------- x : TYPE DESCRIPTION. mean : array_like Observation data. cov : 2d array Covariance matrix of the distribution. Returns ------- log_lik : float Log likelihood. """ n = len(mean) L = spla.cholesky(cov, lower=True) beta = np.sum(np.log(np.diag(L))) dev = x - mean alpha = dev.dot(spla.cho_solve((L, True), dev)) log_lik = -0.5 * alpha - beta - n / 2. * np.log(2 * np.pi) return log_lik # ------------------------------------------------------------------------- def _eval_model(self, samples=None, key='MAP'): """ Evaluates Forward Model. Parameters ---------- samples : array of shape (n_samples, n_params), optional Parameter sets. The default is None. key : str, optional Key string to be passed to the run_model_parallel method. The default is 'MAP'. Returns ------- model_outputs : TYPE DESCRIPTION. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj if samples is None: self.samples = MetaModel.ExpDesign.generate_samples( self.n_samples, 'random') else: self.samples = samples self.n_samples = len(samples) model_outputs, _ = Model.run_model_parallel( self.samples, key_str=key+self.name) # Clean up # Zip the subdirectories try: dir_name = f'{Model.name}MAP{self.name}' key = dir_name + '_' Model.zip_subdirs(dir_name, key) except: pass return model_outputs # ------------------------------------------------------------------------- def _kernel_rbf(self, X, hyperparameters): """ Isotropic squared exponential kernel. Higher l values lead to smoother functions and therefore to coarser approximations of the training data. Lower l values make functions more wiggly with wide uncertainty regions between training data points. sigma_f controls the marginal variance of b(x) Parameters ---------- X : ndarray of shape (n_samples_X, n_features) hyperparameters : Dict Lambda characteristic length sigma_f controls the marginal variance of b(x) sigma_0 unresolvable error nugget term, interpreted as random error that cannot be attributed to measurement error. Returns ------- var_cov_matrix : ndarray of shape (n_samples_X,n_samples_X) Kernel k(X, X). """ from sklearn.gaussian_process.kernels import RBF min_max_scaler = preprocessing.MinMaxScaler() X_minmax = min_max_scaler.fit_transform(X) nparams = len(hyperparameters) # characteristic length (0,1] Lambda = hyperparameters[0] # sigma_f controls the marginal variance of b(x) sigma2_f = hyperparameters[1] # cov_matrix = sigma2_f*rbf_kernel(X_minmax, gamma = 1/Lambda**2) rbf = RBF(length_scale=Lambda) cov_matrix = sigma2_f * rbf(X_minmax) if nparams > 2: # (unresolvable error) nugget term that is interpreted as random # error that cannot be attributed to measurement error. sigma2_0 = hyperparameters[2:] for i, j in np.ndindex(cov_matrix.shape): cov_matrix[i, j] += np.sum(sigma2_0) if i == j else 0 return cov_matrix # ------------------------------------------------------------------------- def normpdf(self, outputs, obs_data, total_sigma2s, sigma2=None, std=None): """ Calculates the likelihood of simulation outputs compared with observation data. Parameters ---------- outputs : dict A dictionary containing the simulation outputs as array of shape (n_samples, n_measurement) for each model output. obs_data : dict A dictionary/dataframe containing the observation data. total_sigma2s : dict A dictionary with known values of the covariance diagonal entries, a.k.a sigma^2. sigma2 : array, optional An array of the sigma^2 samples, when the covariance diagonal entries are unknown and are being jointly inferred. The default is None. std : dict, optional A dictionary containing the root mean squared error as array of shape (n_samples, n_measurement) for each model output. The default is None. Returns ------- logLik : array of shape (n_samples) Likelihoods. """ Model = self.MetaModel.ModelObj logLik = 0.0 # Extract the requested model outputs for likelihood calulation if self.req_outputs is None: req_outputs = Model.Output.names else: req_outputs = list(self.req_outputs) # Loop over the outputs for idx, out in enumerate(req_outputs): # (Meta)Model Output nsamples, nout = outputs[out].shape # Prepare data and remove NaN try: data = obs_data[out].values[~np.isnan(obs_data[out])] except AttributeError: data = obs_data[out][~np.isnan(obs_data[out])] # Prepare sigma2s non_nan_indices = ~np.isnan(total_sigma2s[out]) tot_sigma2s = total_sigma2s[out][non_nan_indices][:nout] # Add the std of the PCE is chosen as emulator. if self.emulator: if std is not None: std_pce = std[out] else: std_pce = np.mean( self._std_pce_prior_pred[out], axis=0) # Expected value of variance (Assump: i.i.d stds) tot_sigma2s += std_pce**2 # If sigma2 is not given, use given total_sigma2s if sigma2 is None: logLik += stats.multivariate_normal.logpdf( outputs[out], data, np.diag(tot_sigma2s)) continue # Loop over each run/sample and calculate logLikelihood logliks = np.zeros(nsamples) for s_idx in range(nsamples): # Simulation run tot_outputs = outputs[out] # Covariance Matrix covMatrix = np.diag(tot_sigma2s) if sigma2 is not None: # Check the type error term if hasattr(self, 'bias_inputs') and \ not hasattr(self, 'error_model'): # Infer a Bias model usig Gaussian Process Regression bias_inputs = np.hstack( (self.bias_inputs[out], tot_outputs[s_idx].reshape(-1, 1))) params = sigma2[s_idx, idx*3:(idx+1)*3] covMatrix = self._kernel_rbf(bias_inputs, params) else: # Infer equal sigma2s try: sigma_2 = sigma2[s_idx, idx] except TypeError: sigma_2 = 0.0 covMatrix += sigma_2 * np.eye(nout) # covMatrix = np.diag(sigma2 * total_sigma2s) # Select the data points to compare if self.selected_indices is not None: indices = self.selected_indices[out] covMatrix = np.diag(covMatrix[indices, indices]) else: indices = list(range(nout)) # Compute loglikelihood logliks[s_idx] = self._logpdf( tot_outputs[s_idx, indices], data[indices], covMatrix ) logLik += logliks return logLik # ------------------------------------------------------------------------- def _corr_factor_BME(self, Data, total_sigma2s, posterior): """ Calculates the correction factor for BMEs. """ MetaModel = self.MetaModel OrigModelOutput = MetaModel.ExpDesign.Y Model = MetaModel.ModelObj # Posterior with guassian-likelihood postDist = stats.gaussian_kde(posterior.T) # Remove NaN Data = Data[~np.isnan(Data)] total_sigma2s = total_sigma2s[~np.isnan(total_sigma2s)] # Covariance Matrix covMatrix = np.diag(total_sigma2s[:self.n_tot_measurement]) # Extract the requested model outputs for likelihood calulation if self.req_outputs is None: OutputType = Model.Output.names else: OutputType = list(self.req_outputs) # SampleSize = OrigModelOutput[OutputType[0]].shape[0] # Flatten the OutputType for OrigModel TotalOutputs = np.concatenate([OrigModelOutput[x] for x in OutputType], 1) NrofBayesSamples = self.n_samples # Evaluate MetaModel on the experimental design Samples = MetaModel.ExpDesign.X OutputRS, stdOutputRS = MetaModel.eval_metamodel(samples=Samples) # Reset the NrofSamples to NrofBayesSamples self.n_samples = NrofBayesSamples # Flatten the OutputType for MetaModel TotalPCEOutputs = np.concatenate([OutputRS[x] for x in OutputRS], 1) TotalPCEstdOutputRS= np.concatenate([stdOutputRS[x] for x in stdOutputRS], 1) logweight = 0 for i,sample in enumerate(Samples): # Compute likelilhood output vs RS covMatrix = np.diag(TotalPCEstdOutputRS[i]**2) logLik = self._logpdf(TotalOutputs[i], TotalPCEOutputs[i], covMatrix) # Compute posterior likelihood of the collocation points logpostLik = np.log(postDist.pdf(sample[:,None]))[0] if logpostLik != -np.inf: logweight += logLik + logpostLik return logweight # # Initialization # covMatrix=np.zeros((NofMeasurements, NofMeasurements), float) # BME_RM_Model_Weight = np.zeros((SampleSize)) # BME_RM_Data_Weight = np.zeros((SampleSize)) # BME_Corr = np.zeros((1)) # # Deviation Computations # RM_Model_Deviation = np.zeros((SampleSize,NofMeasurements)) # RM_Data_Deviation = np.zeros((SampleSize,NofMeasurements)) # for i in range(SampleSize): # RM_Model_Deviation[i] = TotalOutputs[i][:NofMeasurements] - TotalPCEOutputs[i, :] # Reduce model- Full Model # RM_Data_Deviation[i] = Observations - TotalPCEOutputs[i, :] # Reduce model- Measurement Data # # Initialization of Co-Variance Matrix # # For BME_RM_ModelWeight # if NofMeasurements == 1: # RM_Model_Error = np.zeros((NofMeasurements, NofMeasurements), float) # np.fill_diagonal(RM_Model_Error, np.cov(RM_Model_Deviation.T)) # else: # RM_Model_Error = np.cov(RM_Model_Deviation.T) # # Computation of Weight according to the deviations # for i in range(SampleSize): # # For BME_RM_DataWeight # try: # var = Sigma[i] # if len(var)==1: # np.fill_diagonal(covMatrix, var) # else: # row,col = np.diag_indices(covMatrix.shape[0]) # covMatrix[row,col] = np.hstack((np.repeat(var[0], NofMeasurements*0.5),np.repeat(var[1], NofMeasurements*0.5))) # except: # var = Sigma # np.fill_diagonal(covMatrix, var) # # Add the std of the PCE is emulator is chosen. # # if self.emulator: # # covMatrix_PCE = np.zeros((NofMeasurements, NofMeasurements), float) # # stdPCE = np.empty((SampleSize,0)) # # for outputType in OutputType: # # stdPCE = np.hstack((stdPCE, stdOutputRS[outputType])) # # # # stdPCE = np.mean(stdPCE, axis=1) # # np.fill_diagonal(covMatrix_PCE, stdPCE**2) # # # # covMatrix = covMatrix + covMatrix_PCE # # Calculate the denomitor # denom1 = (np.sqrt(2*np.pi)) ** NofMeasurements # denom2 = (((2*np.pi)**(NofMeasurements/2)) * np.sqrt(np.linalg.det(covMatrix))) # BME_RM_Model_Weight[i] = (np.exp(-0.5 * np.dot(np.dot(RM_Model_Deviation[i], np.linalg.pinv(RM_Model_Error)), RM_Model_Deviation[i])))/denom1 # BME_RM_Data_Weight[i] = (np.exp(-0.5 * np.dot(np.dot(RM_Data_Deviation[i], np.linalg.pinv(covMatrix)), RM_Data_Deviation[i][:,np.newaxis])))/denom2 # for i in range(SampleSize): # BME_Corr[0] += BME_RM_Model_Weight[i] * BME_RM_Data_Weight[i] / np.nansum(BME_RM_Data_Weight) # return np.log(BME_Corr[0]) # ------------------------------------------------------------------------- def _rejection_sampling(self): """ Performs rejection sampling to update the prior distribution on the input parameters. Returns ------- posterior : pandas.dataframe Posterior samples of the input parameters. """ MetaModel = self.MetaModel try: sigma2_prior = self.Discrepancy.sigma2_prior except: sigma2_prior = None # Check if the discrepancy is defined as a distribution: samples = self.samples if sigma2_prior is not None: samples = np.hstack((samples, sigma2_prior)) # Take the first column of Likelihoods (Observation data without noise) likelihoods = np.exp(self.log_likes[:, 0], dtype=np.float128) n_samples = len(likelihoods) norm_ikelihoods = likelihoods / np.max(likelihoods) # Normalize based on min if all Likelihoods are zero if all(likelihoods == 0.0): likelihoods = self.log_likes[:, 0] norm_ikelihoods = likelihoods / np.min(likelihoods) # Random numbers between 0 and 1 unif = np.random.rand(1, n_samples)[0] # Reject the poorly performed prior accepted_samples = samples[norm_ikelihoods >= unif] # Output the Posterior par_names = MetaModel.ExpDesign.par_names if sigma2_prior is not None: for name in self.Discrepancy.name: par_names.append(name) return pd.DataFrame(accepted_samples, columns=sigma2_prior) # ------------------------------------------------------------------------- def _posterior_predictive(self): """ Stores the prior- and posterior predictive samples, i.e. model evaluations using the samples, into hdf5 files. priorPredictive.hdf5 : Prior predictive samples. postPredictive_wo_noise.hdf5 : Posterior predictive samples without the additive noise. postPredictive.hdf5 : Posterior predictive samples with the additive noise. Returns ------- None. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj # Make a directory to save the prior/posterior predictive out_dir = f'Outputs_Bayes_{Model.name}_{self.name}' os.makedirs(out_dir, exist_ok=True) # Read observation data and perturb it if requested if self.measured_data is None: self.measured_data = Model.read_observation(case=self.name) if not isinstance(self.measured_data, pd.DataFrame): self.measured_data = pd.DataFrame(self.measured_data) # X_values x_values = MetaModel.ExpDesign.x_values try: sigma2_prior = self.Discrepancy.sigma2_prior except: sigma2_prior = None # Extract posterior samples posterior_df = self.posterior_df # Take care of the sigma2 if sigma2_prior is not None: try: sigma2s = posterior_df[self.Discrepancy.name].values posterior_df = posterior_df.drop( labels=self.Discrepancy.name, axis=1 ) except: sigma2s = self.sigma2s # Posterior predictive if self.emulator: if self.inference_method == 'rejection': prior_pred = self.__mean_pce_prior_pred if self.name.lower() != 'calib': post_pred = self.__mean_pce_prior_pred post_pred_std = self._std_pce_prior_pred else: post_pred, post_pred_std = MetaModel.eval_metamodel( samples=posterior_df.values ) else: if self.inference_method == 'rejection': prior_pred = self.__model_prior_pred if self.name.lower() != 'calib': post_pred = self.__mean_pce_prior_pred, post_pred_std = self._std_pce_prior_pred else: post_pred = self._eval_model( samples=posterior_df.values, key='PostPred' ) # Correct the predictions with Model discrepancy if hasattr(self, 'error_model') and self.error_model: y_hat, y_std = self.error_MetaModel.eval_model_error( self.bias_inputs, post_pred ) post_pred, post_pred_std = y_hat, y_std # Add discrepancy from likelihood Sample to the current posterior runs total_sigma2 = self.Discrepancy.total_sigma2 post_pred_withnoise = copy.deepcopy(post_pred) for varIdx, var in enumerate(Model.Output.names): for i in range(len(post_pred[var])): pred = post_pred[var][i] # Known sigma2s clean_sigma2 = total_sigma2[var][~np.isnan(total_sigma2[var])] tot_sigma2 = clean_sigma2[:len(pred)] cov = np.diag(tot_sigma2) # Check the type error term if sigma2_prior is not None: # Inferred sigma2s if hasattr(self, 'bias_inputs') and \ not hasattr(self, 'error_model'): # TODO: Infer a Bias model usig GPR bias_inputs = np.hstack(( self.bias_inputs[var], pred.reshape(-1, 1))) params = sigma2s[i, varIdx*3:(varIdx+1)*3] cov = self._kernel_rbf(bias_inputs, params) else: # Infer equal sigma2s try: sigma2 = sigma2s[i, varIdx] except TypeError: sigma2 = 0.0 # Convert biasSigma2s to a covMatrix cov += sigma2 * np.eye(len(pred)) if self.emulator: if hasattr(MetaModel, 'rmse') and \ MetaModel.rmse is not None: stdPCE = MetaModel.rmse[var] else: stdPCE = post_pred_std[var][i] # Expected value of variance (Assump: i.i.d stds) cov += np.diag(stdPCE**2) # Sample a multivariate normal distribution with mean of # prediction and variance of cov post_pred_withnoise[var][i] = np.random.multivariate_normal( pred, cov, 1 ) # ----- Prior Predictive ----- if self.inference_method.lower() == 'rejection': # Create hdf5 metadata hdf5file = f'{out_dir}/priorPredictive.hdf5' hdf5_exist = os.path.exists(hdf5file) if hdf5_exist: os.remove(hdf5file) file = h5py.File(hdf5file, 'a') # Store x_values if type(x_values) is dict: grp_x_values = file.create_group("x_values/") for varIdx, var in enumerate(Model.Output.names): grp_x_values.create_dataset(var, data=x_values[var]) else: file.create_dataset("x_values", data=x_values) # Store posterior predictive grpY = file.create_group("EDY/") for varIdx, var in enumerate(Model.Output.names): grpY.create_dataset(var, data=prior_pred[var]) # ----- Posterior Predictive only model evaluations ----- # Create hdf5 metadata hdf5file = out_dir+'/postPredictive_wo_noise.hdf5' hdf5_exist = os.path.exists(hdf5file) if hdf5_exist: os.remove(hdf5file) file = h5py.File(hdf5file, 'a') # Store x_values if type(x_values) is dict: grp_x_values = file.create_group("x_values/") for varIdx, var in enumerate(Model.Output.names): grp_x_values.create_dataset(var, data=x_values[var]) else: file.create_dataset("x_values", data=x_values) # Store posterior predictive grpY = file.create_group("EDY/") for varIdx, var in enumerate(Model.Output.names): grpY.create_dataset(var, data=post_pred[var]) # ----- Posterior Predictive with noise ----- # Create hdf5 metadata hdf5file = out_dir+'/postPredictive.hdf5' hdf5_exist = os.path.exists(hdf5file) if hdf5_exist: os.remove(hdf5file) file = h5py.File(hdf5file, 'a') # Store x_values if type(x_values) is dict: grp_x_values = file.create_group("x_values/") for varIdx, var in enumerate(Model.Output.names): grp_x_values.create_dataset(var, data=x_values[var]) else: file.create_dataset("x_values", data=x_values) # Store posterior predictive grpY = file.create_group("EDY/") for varIdx, var in enumerate(Model.Output.names): grpY.create_dataset(var, data=post_pred_withnoise[var]) return # ------------------------------------------------------------------------- def _plot_max_a_posteriori(self): """ Plots the response of the model output against that of the metamodel at the maximum a posteriori sample (mean or mode of posterior.) Returns ------- None. """ MetaModel = self.MetaModel Model = MetaModel.ModelObj out_dir = f'Outputs_Bayes_{Model.name}_{self.name}' opt_sigma = self.Discrepancy.opt_sigma # -------- Find MAP and run MetaModel and origModel -------- # Compute the MAP if self.max_a_posteriori.lower() == 'mean': if opt_sigma == "B": Posterior_df = self.posterior_df.values else: Posterior_df = self.posterior_df.values[:, :-Model.n_outputs] map_theta = Posterior_df.mean(axis=0).reshape( (1, MetaModel.n_params)) else: map_theta = stats.mode(Posterior_df.values, axis=0)[0] # Prin report print("\nPoint estimator:\n", map_theta[0]) # Run the models for MAP # MetaModel map_metamodel_mean, map_metamodel_std = MetaModel.eval_metamodel( samples=map_theta) self.map_metamodel_mean = map_metamodel_mean self.map_metamodel_std = map_metamodel_std # origModel map_orig_model = self._eval_model(samples=map_theta) self.map_orig_model = map_orig_model # Extract slicing index x_values = map_orig_model['x_values'] # List of markers and colors Color = ['k', 'b', 'g', 'r'] Marker = 'x' # Create a PdfPages object pdf = PdfPages(f'./{out_dir}MAP_PCE_vs_Model_{self.name}.pdf') fig = plt.figure() for i, key in enumerate(Model.Output.names): y_val = map_orig_model[key] y_pce_val = map_metamodel_mean[key] y_pce_val_std = map_metamodel_std[key] plt.plot(x_values, y_val, color=Color[i], marker=Marker, lw=2.0, label='$Y_{MAP}^{M}$') plt.plot( x_values, y_pce_val[i], color=Color[i], lw=2.0, marker=Marker, linestyle='--', label='$Y_{MAP}^{PCE}$' ) # plot the confidence interval plt.fill_between( x_values, y_pce_val[i] - 1.96*y_pce_val_std[i], y_pce_val[i] + 1.96*y_pce_val_std[i], color=Color[i], alpha=0.15 ) # Calculate the adjusted R_squared and RMSE R2 = r2_score(y_pce_val.reshape(-1, 1), y_val.reshape(-1, 1)) rmse = np.sqrt(mean_squared_error(y_pce_val, y_val)) plt.ylabel(key) plt.xlabel("Time [s]") plt.title(f'Model vs MetaModel {key}') ax = fig.axes[0] leg = ax.legend(loc='best', frameon=True) fig.canvas.draw() p = leg.get_window_extent().inverse_transformed(ax.transAxes) ax.text( p.p0[1]-0.05, p.p1[1]-0.25, f'RMSE = {rmse:.3f}\n$R^2$ = {R2:.3f}', transform=ax.transAxes, color='black', bbox=dict(facecolor='none', edgecolor='black', boxstyle='round,pad=1')) plt.show() # save the current figure pdf.savefig(fig, bbox_inches='tight') # Destroy the current plot plt.clf() pdf.close() # ------------------------------------------------------------------------- def _plot_post_predictive(self): """ Plots the posterior predictives against the observation data. Returns ------- None. """ Model = self.MetaModel.ModelObj out_dir = f'Outputs_Bayes_{Model.name}_{self.name}' # Plot the posterior predictive for out_idx, out_name in enumerate(Model.Output.names): fig, ax = plt.subplots() with sns.axes_style("ticks"): x_key = list(self.measured_data)[0] # --- Read prior and posterior predictive --- if self.inference_method == 'rejection': # --- Prior --- # Load posterior predictive f = h5py.File( f'{out_dir}/priorPredictive.hdf5', 'r+') try: x_coords = np.array(f[f"x_values/{out_name}"]) except: x_coords = np.array(f["x_values"]) X_values = np.repeat(x_coords, 10000) prior_pred_df = {} prior_pred_df[x_key] = X_values prior_pred_df[out_name] = np.array( f[f"EDY/{out_name}"])[:10000].flatten('F') prior_pred_df = pd.DataFrame(prior_pred_df) tags_post = ['prior'] * len(prior_pred_df) prior_pred_df.insert( len(prior_pred_df.columns), "Tags", tags_post, True) f.close() # --- Posterior --- f = h5py.File(f"{out_dir}/postPredictive.hdf5", 'r+') X_values = np.repeat( x_coords, np.array(f[f"EDY/{out_name}"]).shape[0]) post_pred_df = {} post_pred_df[x_key] = X_values post_pred_df[out_name] = np.array( f[f"EDY/{out_name}"]).flatten('F') post_pred_df = pd.DataFrame(post_pred_df) tags_post = ['posterior'] * len(post_pred_df) post_pred_df.insert( len(post_pred_df.columns), "Tags", tags_post, True) f.close() # Concatenate two dataframes based on x_values frames = [prior_pred_df, post_pred_df] all_pred_df = pd.concat(frames) # --- Plot posterior predictive --- sns.violinplot( x_key, y=out_name, data=all_pred_df, hue="Tags", legend=False, ax=ax, split=True, inner=None, color=".8") # --- Plot Data --- # Find the x,y coordinates for each point x_coords = np.arange(x_coords.shape[0]) first_header = list(self.measured_data)[0] obs_data = self.measured_data.round({first_header: 6}) sns.pointplot( x=first_header, y=out_name, color='g', markers='x', linestyles='', capsize=16, data=obs_data, ax=ax) ax.errorbar( x_coords, obs_data[out_name].values, yerr=1.96*self.measurement_error[out_name], ecolor='g', fmt=' ', zorder=-1) # Add labels to the legend handles, labels = ax.get_legend_handles_labels() labels.append('Data') data_marker = mlines.Line2D( [], [], color='lime', marker='+', linestyle='None', markersize=10) handles.append(data_marker) # Add legend ax.legend(handles=handles, labels=labels, loc='best', fontsize='large', frameon=True) else: # Load posterior predictive f = h5py.File(f"{out_dir}/postPredictive.hdf5", 'r+') try: x_coords = np.array(f["x_values"]) except: x_coords = np.array(f[f"x_values/{out_name}"]) mu = np.mean(np.array(f[f"EDY/{out_name}"]), axis=0) std = np.std(np.array(f[f"EDY/{out_name}"]), axis=0) # --- Plot posterior predictive --- plt.plot( x_coords, mu, marker='o', color='b', label='Mean Post. Predictive') plt.fill_between( x_coords, mu-1.96*std, mu+1.96*std, color='b', alpha=0.15) # --- Plot Data --- ax.plot( x_coords, self.measured_data[out_name].values, 'ko', label='data', markeredgecolor='w') # --- Plot ExpDesign --- orig_ED_Y = self.MetaModel.ExpDesign.Y[out_name] for output in orig_ED_Y: plt.plot( x_coords, output, color='grey', alpha=0.15 ) # Add labels for axes plt.xlabel('Time [s]') plt.ylabel(out_name) # Add labels to the legend handles, labels = ax.get_legend_handles_labels() patch = Patch(color='b', alpha=0.15) handles.insert(1, patch) labels.insert(1, '95 $\\%$ CI') # Add legend ax.legend(handles=handles, labels=labels, loc='best', frameon=True) # Save figure in pdf format if self.emulator: plotname = f'/Post_Prior_Perd_{Model.name}_emulator' else: plotname = f'/Post_Prior_Perd_{Model.name}' fig.savefig(f'./{out_dir}{plotname}_{out_name}.pdf', bbox_inches='tight')
Methods
def create_inference(self)
-
Starts the inference.
Returns
BayesInference
:obj
- The Bayes inference object.
Expand source code
def create_inference(self): """ Starts the inference. Returns ------- BayesInference : obj The Bayes inference object. """ # Set some variables MetaModel = self.MetaModel Model = MetaModel.ModelObj n_params = MetaModel.n_params output_names = Model.Output.names par_names = MetaModel.ExpDesign.par_names # If the prior is set by the user, take it. if self.samples is None: self.samples = MetaModel.ExpDesign.generate_samples( self.n_samples, 'random') else: try: samples = self.samples.values except AttributeError: samples = self.samples # Take care of an additional Sigma2s self.samples = samples[:, :n_params] # Update number of samples self.n_samples = self.samples.shape[0] # ---------- Preparation of observation data ---------- # Read observation data and perturb it if requested. if self.measured_data is None: self.measured_data = Model.read_observation(case=self.name) # Convert measured_data to a data frame if not isinstance(self.measured_data, pd.DataFrame): self.measured_data = pd.DataFrame(self.measured_data) # Extract the total number of measurement points if self.name.lower() == 'calib': self.n_tot_measurement = Model.n_obs else: self.n_tot_measurement = Model.n_obs_valid # Find measurement error (if not given) for post predictive plot if not hasattr(self, 'measurement_error'): if isinstance(self.Discrepancy, dict): Disc = self.Discrepancy['known'] else: Disc = self.Discrepancy if isinstance(Disc.parameters, dict): self.measurement_error = {k: np.sqrt(Disc.parameters[k]) for k in Disc.parameters.keys()} else: try: self.measurement_error = np.sqrt(Disc.parameters) except TypeError: pass # ---------- Preparation of variance for covariance matrix ---------- # Independent and identically distributed total_sigma2 = dict() opt_sigma_flag = isinstance(self.Discrepancy, dict) opt_sigma = None for key_idx, key in enumerate(output_names): # Find opt_sigma if opt_sigma_flag and opt_sigma is None: # Option A: known error with unknown bias term opt_sigma = 'A' known_discrepancy = self.Discrepancy['known'] self.Discrepancy = self.Discrepancy['infer'] sigma2 = np.array(known_discrepancy.parameters[key]) elif opt_sigma == 'A' or self.Discrepancy.parameters is not None: # Option B: The sigma2 is known (no bias term) if opt_sigma == 'A': sigma2 = np.array(known_discrepancy.parameters[key]) else: opt_sigma = 'B' sigma2 = np.array(self.Discrepancy.parameters[key]) elif not isinstance(self.Discrepancy.InputDisc, str): # Option C: The sigma2 is unknown (bias term including error) opt_sigma = 'C' self.Discrepancy.opt_sigma = opt_sigma n_measurement = self.measured_data[key].values.shape sigma2 = np.zeros((n_measurement[0])) total_sigma2[key] = sigma2 self.Discrepancy.opt_sigma = opt_sigma self.Discrepancy.total_sigma2 = total_sigma2 # If inferred sigma2s obtained from e.g. calibration are given try: self.sigma2s = self.Discrepancy.get_sample(self.n_samples) except: pass # ---------------- Bootstrap & TOM -------------------- if self.bootstrap or self.bayes_loocv: if len(self.perturbed_data) == 0: # zero mean noise Adding some noise to the observation function self.perturbed_data = self._perturb_data( self.measured_data, output_names ) else: self.n_bootstrap_itrs = len(self.perturbed_data) # -------- Model Discrepancy ----------- if hasattr(self, 'error_model') and self.error_model \ and self.name.lower() != 'calib': # Select posterior mean as MAP MAP_theta = self.samples.mean(axis=0).reshape((1, n_params)) # MAP_theta = stats.mode(self.samples,axis=0)[0] # Evaluate the (meta-)model at the MAP y_MAP, y_std_MAP = MetaModel.eval_metamodel(samples=MAP_theta) # Train a GPR meta-model using MAP self.error_MetaModel = MetaModel.create_model_error( self.bias_inputs, y_MAP, Name=self.name ) # ----------------------------------------------------- # ----- Loop over the perturbed observation data ------ # ----------------------------------------------------- # Initilize arrays logLikelihoods = np.zeros((self.n_samples, self.n_bootstrap_itrs), dtype=np.float16) BME_Corr = np.zeros((self.n_bootstrap_itrs)) log_BME = np.zeros((self.n_bootstrap_itrs)) KLD = np.zeros((self.n_bootstrap_itrs)) inf_entropy = np.zeros((self.n_bootstrap_itrs)) # Compute the prior predtions # Evaluate the MetaModel if self.emulator: y_hat, y_std = MetaModel.eval_metamodel(samples=self.samples) self.__mean_pce_prior_pred = y_hat self._std_pce_prior_pred = y_std # Correct the predictions with Model discrepancy if hasattr(self, 'error_model') and self.error_model: y_hat_corr, y_std = self.error_MetaModel.eval_model_error( self.bias_inputs, self.__mean_pce_prior_pred ) self.__mean_pce_prior_pred = y_hat_corr self._std_pce_prior_pred = y_std # Surrogate model's error using RMSE of test data if hasattr(MetaModel, 'rmse'): surrError = MetaModel.rmse else: surrError = None else: # Evaluate the original model self.__model_prior_pred = self._eval_model( samples=self.samples, key='PriorPred' ) # Start the likelihood-BME computations for the perturbed data for itr_idx, data in tqdm( enumerate(self.perturbed_data), ascii=True, desc="Boostraping the BME calculations" ): # ---------------- Likelihood calculation ---------------- if self.emulator: model_evals = self.__mean_pce_prior_pred else: model_evals = self.__model_prior_pred # Leave one out if self.bayes_loocv: self.selected_indices = np.nonzero(data)[0] # Prepare data dataframe nobs = list(self.measured_data.count().values[1:]) numbers = list(map(sum, zip([0] + nobs, nobs))) indices = list(zip([0] + numbers, numbers)) data_dict = { output_names[i]: data[j:k] for i, (j, k) in enumerate(indices) } # Unknown sigma2 if opt_sigma == 'C' or hasattr(self, 'sigma2s'): logLikelihoods[:, itr_idx] = self.normpdf( model_evals, data_dict, total_sigma2, sigma2=self.sigma2s, std=surrError ) else: # known sigma2 logLikelihoods[:, itr_idx] = self.normpdf( model_evals, data_dict, total_sigma2, std=surrError ) # ---------------- BME Calculations ---------------- # BME (log) log_BME[itr_idx] = np.log( np.nanmean(np.exp(logLikelihoods[:, itr_idx], dtype=np.float128)) ) # Rejection Step # Random numbers between 0 and 1 unif = np.random.rand(1, self.n_samples)[0] # Reject the poorly performed prior Likelihoods = np.exp(logLikelihoods[:, itr_idx], dtype=np.float64) accepted = (Likelihoods/np.max(Likelihoods)) >= unif posterior = self.samples[accepted] # Posterior-based expectation of likelihoods postExpLikelihoods = np.mean( logLikelihoods[:, itr_idx][accepted] ) # Posterior-based expectation of prior densities postExpPrior = np.mean( np.log([MetaModel.ExpDesign.JDist.pdf(posterior.T)]) ) # Calculate Kullback-Leibler Divergence KLD[itr_idx] = postExpLikelihoods - log_BME[itr_idx] # Information Entropy based on Entropy paper Eq. 38 inf_entropy[itr_idx] = log_BME[itr_idx] - postExpPrior - \ postExpLikelihoods # TODO: BME correction when using Emulator # if self.emulator: # BME_Corr[itr_idx] = self._corr_factor_BME( # data, total_sigma2, posterior # ) # Clear memory gc.collect(generation=2) # ---------------- Store BME, Likelihoods for all ---------------- # Likelihoods (Size: n_samples, n_bootstrap_itr) self.log_likes = logLikelihoods # BME (log), KLD, infEntropy (Size: 1,n_bootstrap_itr) self.log_BME = log_BME self.KLD = KLD self.inf_entropy = inf_entropy # TODO: BMECorrFactor (log) (Size: 1,n_bootstrap_itr) # if self.emulator: self.BMECorrFactor = BME_Corr # BME = BME + BMECorrFactor if self.emulator: self.log_BME = self.log_BME # + self.BMECorrFactor # ---------------- Parameter Bayesian inference ---------------- if self.inference_method.lower() == 'mcmc': # Instantiate the MCMC object MCMC_Obj = MCMC(self) self.posterior_df = MCMC_Obj.run_sampler( self.measured_data, total_sigma2 ) elif self.name.lower() == 'valid': # Convert to a dataframe if samples are provided after calibration. self.posterior_df = pd.DataFrame(self.samples, columns=par_names) else: # Rejection sampling self.posterior_df = self._rejection_sampling() # Provide posterior's summary print('\n') print('-'*15 + 'Posterior summary' + '-'*15) pd.options.display.max_columns = None pd.options.display.max_rows = None print(self.posterior_df.describe()) print('-'*50) # -------- Model Discrepancy ----------- if hasattr(self, 'error_model') and self.error_model \ and self.name.lower() == 'calib': if self.inference_method.lower() == 'mcmc': self.error_MetaModel = MCMC_Obj.error_MetaModel else: # Select posterior mean as MAP if opt_sigma == "B": posterior_df = self.posterior_df.values else: posterior_df = self.posterior_df.values[:, :-Model.n_outputs] # Select posterior mean as Maximum a posteriori map_theta = posterior_df.mean(axis=0).reshape((1, n_params)) # map_theta = stats.mode(Posterior_df,axis=0)[0] # Evaluate the (meta-)model at the MAP y_MAP, y_std_MAP = MetaModel.eval_metamodel(samples=map_theta) # Train a GPR meta-model using MAP self.error_MetaModel = MetaModel.create_model_error( self.bias_inputs, y_MAP, Name=self.name ) # -------- Posterior perdictive ----------- self._posterior_predictive() # ----------------------------------------------------- # ------------------ Visualization -------------------- # ----------------------------------------------------- # Create Output directory, if it doesn't exist already. out_dir = f'Outputs_Bayes_{Model.name}_{self.name}' os.makedirs(out_dir, exist_ok=True) # -------- Posteior parameters -------- if opt_sigma != "B": par_names.extend( [self.Discrepancy.InputDisc.Marginals[i].name for i in range(len(self.Discrepancy.InputDisc.Marginals))] ) # Pot with corner figPosterior = corner.corner(self.posterior_df.to_numpy(), labels=par_names, quantiles=[0.15, 0.5, 0.85], show_titles=True, title_fmt=self.corner_title_fmt, labelpad=0.2, use_math_text=True, title_kwargs={"fontsize": 28}, plot_datapoints=False, plot_density=False, fill_contours=True, smooth=0.5, smooth1d=0.5) # Loop over axes and set x limits if opt_sigma == "B": axes = np.array(figPosterior.axes).reshape( (len(par_names), len(par_names)) ) for yi in range(len(par_names)): ax = axes[yi, yi] ax.set_xlim(MetaModel.bound_tuples[yi]) for xi in range(yi): ax = axes[yi, xi] ax.set_xlim(MetaModel.bound_tuples[xi]) # Turn off gridlines for ax in figPosterior.axes: ax.grid(False) if self.emulator: plotname = f'/Posterior_Dist_{Model.name}_emulator' else: plotname = f'/Posterior_Dist_{Model.name}' figPosterior.set_size_inches((24, 16)) figPosterior.savefig(f'./{out_dir}{plotname}.pdf', bbox_inches='tight') # -------- Plot MAP -------- if self.plot_map_pred: self._plot_max_a_posteriori() # -------- Plot log_BME dist -------- if self.bootstrap and self.n_bootstrap_itrs > 1: # Computing the TOM performance self.log_BME_tom = stats.chi2.rvs( self.n_tot_measurement, size=self.log_BME.shape[0] ) fig, ax = plt.subplots() sns.kdeplot(self.log_BME_tom, ax=ax, color="green", shade=True) sns.kdeplot( self.log_BME, ax=ax, color="blue", shade=True, label='Model BME') ax.set_xlabel('log$_{10}$(BME)') ax.set_ylabel('Probability density') legend_elements = [ Patch(facecolor='green', edgecolor='green', label='TOM BME'), Patch(facecolor='blue', edgecolor='blue', label='Model BME') ] ax.legend(handles=legend_elements) if self.emulator: plotname = f'/BME_hist_{Model.name}_emulator' else: plotname = f'/BME_hist_{Model.name}' plt.savefig(f'./{out_dir}{plotname}.pdf', bbox_inches='tight') plt.show() plt.close() # -------- Posteior perdictives -------- if self.plot_post_pred: # Plot the posterior predictive self._plot_post_predictive() return self
def normpdf(self, outputs, obs_data, total_sigma2s, sigma2=None, std=None)
-
Calculates the likelihood of simulation outputs compared with observation data.
Parameters
outputs
:dict
- A dictionary containing the simulation outputs as array of shape (n_samples, n_measurement) for each model output.
obs_data
:dict
- A dictionary/dataframe containing the observation data.
total_sigma2s
:dict
- A dictionary with known values of the covariance diagonal entries, a.k.a sigma^2.
sigma2
:array
, optional- An array of the sigma^2 samples, when the covariance diagonal entries are unknown and are being jointly inferred. The default is None.
std
:dict
, optional- A dictionary containing the root mean squared error as array of shape (n_samples, n_measurement) for each model output. The default is None.
Returns
logLik
:array
ofshape (n_samples)
- Likelihoods.
Expand source code
def normpdf(self, outputs, obs_data, total_sigma2s, sigma2=None, std=None): """ Calculates the likelihood of simulation outputs compared with observation data. Parameters ---------- outputs : dict A dictionary containing the simulation outputs as array of shape (n_samples, n_measurement) for each model output. obs_data : dict A dictionary/dataframe containing the observation data. total_sigma2s : dict A dictionary with known values of the covariance diagonal entries, a.k.a sigma^2. sigma2 : array, optional An array of the sigma^2 samples, when the covariance diagonal entries are unknown and are being jointly inferred. The default is None. std : dict, optional A dictionary containing the root mean squared error as array of shape (n_samples, n_measurement) for each model output. The default is None. Returns ------- logLik : array of shape (n_samples) Likelihoods. """ Model = self.MetaModel.ModelObj logLik = 0.0 # Extract the requested model outputs for likelihood calulation if self.req_outputs is None: req_outputs = Model.Output.names else: req_outputs = list(self.req_outputs) # Loop over the outputs for idx, out in enumerate(req_outputs): # (Meta)Model Output nsamples, nout = outputs[out].shape # Prepare data and remove NaN try: data = obs_data[out].values[~np.isnan(obs_data[out])] except AttributeError: data = obs_data[out][~np.isnan(obs_data[out])] # Prepare sigma2s non_nan_indices = ~np.isnan(total_sigma2s[out]) tot_sigma2s = total_sigma2s[out][non_nan_indices][:nout] # Add the std of the PCE is chosen as emulator. if self.emulator: if std is not None: std_pce = std[out] else: std_pce = np.mean( self._std_pce_prior_pred[out], axis=0) # Expected value of variance (Assump: i.i.d stds) tot_sigma2s += std_pce**2 # If sigma2 is not given, use given total_sigma2s if sigma2 is None: logLik += stats.multivariate_normal.logpdf( outputs[out], data, np.diag(tot_sigma2s)) continue # Loop over each run/sample and calculate logLikelihood logliks = np.zeros(nsamples) for s_idx in range(nsamples): # Simulation run tot_outputs = outputs[out] # Covariance Matrix covMatrix = np.diag(tot_sigma2s) if sigma2 is not None: # Check the type error term if hasattr(self, 'bias_inputs') and \ not hasattr(self, 'error_model'): # Infer a Bias model usig Gaussian Process Regression bias_inputs = np.hstack( (self.bias_inputs[out], tot_outputs[s_idx].reshape(-1, 1))) params = sigma2[s_idx, idx*3:(idx+1)*3] covMatrix = self._kernel_rbf(bias_inputs, params) else: # Infer equal sigma2s try: sigma_2 = sigma2[s_idx, idx] except TypeError: sigma_2 = 0.0 covMatrix += sigma_2 * np.eye(nout) # covMatrix = np.diag(sigma2 * total_sigma2s) # Select the data points to compare if self.selected_indices is not None: indices = self.selected_indices[out] covMatrix = np.diag(covMatrix[indices, indices]) else: indices = list(range(nout)) # Compute loglikelihood logliks[s_idx] = self._logpdf( tot_outputs[s_idx, indices], data[indices], covMatrix ) logLik += logliks return logLik