Package bayesvalidrox

Expand source code
# -*- coding: utf-8 -*-
__version__ = "0.0.2"

from .pylink.pylink import PyLinkForwardModel
from .surrogate_models.surrogate_models import MetaModel
from .surrogate_models.inputs import Input
from .post_processing.post_processing import PostProcessing
from .bayes_inference.bayes_inference import BayesInference
from .bayes_inference.discrepancy import Discrepancy

__all__ = [
    "__version__",
    "PyLinkForwardModel",
    "Input",
    "Discrepancy",
    "MetaModel",
    "PostProcessing",
    "BayesInference"
    ]

Sub-modules

bayesvalidrox.bayes_inference
bayesvalidrox.post_processing
bayesvalidrox.pylink
bayesvalidrox.surrogate_models

Classes

class PyLinkForwardModel (link_type='pylink', name=None, py_file=None, shell_command='', input_file=None, input_template=None, aux_file=None, exe_path='', output_file_names=[], output_names=[], output_parser='', multi_process=True, n_cpus=None, meas_file=None, meas_file_valid=None, mc_ref_file=None, obs_dict={}, obs_dict_valid={}, mc_ref_dict={})

A forward model binder

This calss serves as a code wrapper. This wrapper allows the execution of a third-party software/solver within the scope of BayesValidRox.

Attributes

link_type : str
The type of the wrapper. The default is 'pylink'. This runs the third-party software or an executable using a sell command with given input files. Second option is 'function' which assumed that model can be run using a function written separately in a Python script.
name : str
Name of the model.
py_file : str
Python file name without .py extension to be run for the 'function' wrapper. Note that the name of the python file and that of the function must be simillar. This function must recieve the parameters in an array of shape (n_samples, n_params) and returns a dictionary with the x_values and output arrays for given output names.
shell_command : str
Shell command to be executed for the 'pylink' wrapper.
input_file : str or list
The input file to be passed to the 'pylink' wrapper.
input_template : str or list
A template input file to be passed to the 'pylink' wrapper. This file must be a copy of input_file with <Xi> place holder for the input parameters defined using inputs class, with i being the number of parameter. The file name ending should include .tpl before the actual extension of the input file, for example, params.tpl.input.
aux_file : str or list
The list of auxiliary files needed for the 'pylink' wrapper.
exe_path : str
Execution path if you wish to run the model for the 'pylink' wrapper in another directory. The default is None, which corresponds to the currecnt working directory.
output_file_names : list of str
List of the name of the model output text files for the 'pylink' wrapper.
output_names : list of str
List of the model outputs to be used for the analysis.
output_parser : str
Name of the model parser file (without .py extension) that recieves the output_file_names and returns a 2d-array with the first row being the x_values, e.g. x coordinates or time and the rest of raws pass the simulation output for each model output defined in output_names. Note that again here the name of the file and that of the function must be the same.
multi_process : bool
Whether the model runs to be executed in parallel for the 'pylink' wrapper. The default is True.
n_cpus : int
The number of cpus to be used for the parallel model execution for the 'pylink' wrapper. The default is None, which corresponds to all available cpus.
meas_file : str
The name of the measurement text-based file. This file must contain x_values as the first column and one column for each model output. The default is None. Only needed for the Bayesian Inference.
meas_file_valid : str
The name of the measurement text-based file for the validation. The default is None. Only needed for the validation with Bayesian Inference.
mc_ref_file : str
The name of the text file for the Monte-Carlo reference (mean and standard deviation) values. It must contain x_values as the first column, mean as the second column and std as the third. It can be used to compare the estimated moments using meta-model in the post- processing step. This is only available for one output.
obs_dict : dict
A dictionary containing the measurement text-based file. It must contain x_values as the first item and one item for each model output . The default is {}. Only needed for the Bayesian Inference.
obs_dict_valid : dict
A dictionary containing the validation measurement text-based file. It must contain x_values as the first item and one item for each model output. The default is {}.
mc_ref_dict : dict
A dictionary containing the Monte-Carlo reference (mean and standard deviation) values. It must contain x_values as the first item and mean as the second item and std as the third. The default is {}. This is only available for one output.
Expand source code
class PyLinkForwardModel(object):
    """
    A forward model binder

    This calss serves as a code wrapper. This wrapper allows the execution of
    a third-party software/solver within the scope of BayesValidRox.

    Attributes
    ----------
    link_type : str
        The type of the wrapper. The default is `'pylink'`. This runs the
        third-party software or an executable using a sell command with given
        input files.
        Second option is `'function'` which assumed that model can be run using
        a function written separately in a Python script.
    name : str
        Name of the model.
    py_file : str
        Python file name without `.py` extension to be run for the `'function'`
        wrapper. Note that the name of the python file and that of the function
        must be simillar. This function must recieve the parameters in an array
        of shape `(n_samples, n_params)` and returns a dictionary with the
        x_values and output arrays for given output names.
    shell_command : str
        Shell command to be executed for the `'pylink'` wrapper.
    input_file : str or list
        The input file to be passed to the `'pylink'` wrapper.
    input_template : str or list
        A template input file to be passed to the `'pylink'` wrapper. This file
        must be a copy of `input_file` with `<Xi>` place holder for the input
        parameters defined using `inputs` class, with i being the number of
        parameter. The file name ending should include `.tpl` before the actual
        extension of the input file, for example, `params.tpl.input`.
    aux_file : str or list
        The list of auxiliary files needed for the `'pylink'` wrapper.
    exe_path : str
        Execution path if you wish to run the model for the `'pylink'` wrapper
        in another directory. The default is `None`, which corresponds to the
        currecnt working directory.
    output_file_names : list of str
        List of the name of the model output text files for the `'pylink'`
        wrapper.
    output_names : list of str
        List of the model outputs to be used for the analysis.
    output_parser : str
        Name of the model parser file (without `.py` extension) that recieves
        the `output_file_names` and returns a 2d-array with the first row being
        the x_values, e.g. x coordinates or time and the rest of raws pass the
        simulation output for each model output defined in `output_names`. Note
        that again here the name of the file and that of the function must be
        the same.
    multi_process: bool
        Whether the model runs to be executed in parallel for the `'pylink'`
        wrapper. The default is `True`.
    n_cpus: int
        The number of cpus to be used for the parallel model execution for the
        `'pylink'` wrapper. The default is `None`, which corresponds to all
        available cpus.
    meas_file : str
        The name of the measurement text-based file. This file must contain
        x_values as the first column and one column for each model output. The
        default is `None`. Only needed for the Bayesian Inference.
    meas_file_valid : str
        The name of the measurement text-based file for the validation. The
        default is `None`. Only needed for the validation with Bayesian
        Inference.
    mc_ref_file : str
        The name of the text file for the Monte-Carlo reference (mean and
        standard deviation) values. It must contain `x_values` as the first
        column, `mean` as the second column and `std` as the third. It can be
        used to compare the estimated moments using meta-model in the post-
        processing step. This is only available for one output.
    obs_dict : dict
        A dictionary containing the measurement text-based file. It must
        contain `x_values` as the first item and one item for each model output
        . The default is `{}`. Only needed for the Bayesian Inference.
    obs_dict_valid : dict
        A dictionary containing the validation measurement text-based file. It
        must contain `x_values` as the first item and one item for each model
        output. The default is `{}`.
    mc_ref_dict : dict
        A dictionary containing the Monte-Carlo reference (mean and standard
        deviation) values. It must contain `x_values` as the first item and
        `mean` as the second item and `std` as the third. The default is `{}`.
        This is only available for one output.
    """

    def __init__(self, link_type='pylink', name=None, py_file=None,
                 shell_command='', input_file=None, input_template=None,
                 aux_file=None, exe_path='', output_file_names=[],
                 output_names=[], output_parser='', multi_process=True,
                 n_cpus=None, meas_file=None, meas_file_valid=None,
                 mc_ref_file=None, obs_dict={}, obs_dict_valid={},
                 mc_ref_dict={}):
        self.link_type = link_type
        self.name = name
        self.shell_command = shell_command
        self.py_file = py_file
        self.input_file = input_file
        self.input_template = input_template
        self.aux_file = aux_file
        self.exe_path = exe_path
        self.multi_process = multi_process
        self.n_cpus = n_cpus
        self.Output.parser = output_parser
        self.Output.names = output_names
        self.Output.file_names = output_file_names
        self.meas_file = meas_file
        self.meas_file_valid = meas_file_valid
        self.mc_ref_file = mc_ref_file
        self.observations = obs_dict
        self.observations_valid = obs_dict_valid
        self.mc_reference = mc_ref_dict

    # Nested class
    class Output:
        def __init__(self):
            self.parser = None
            self.names = None
            self.file_names = None

    # -------------------------------------------------------------------------
    def within_range(self, out, minout, maxout):
        inside = False
        if (out > minout).all() and (out < maxout).all():
            inside = True
        return inside

    # -------------------------------------------------------------------------
    def read_observation(self, case='calib'):
        """
        Reads/prepare the observation/measurement data for
        calibration.

        Returns
        -------
        DataFrame
            A dataframe with the calibration data.

        """
        if case.lower() == 'calib':
            if isinstance(self.observations, dict) and bool(self.observations):
                obs = pd.DataFrame.from_dict(self.observations)
            elif self.meas_file is not None:
                file_path = os.path.join(os.getcwd(), self.meas_file)
                obs = pd.read_csv(file_path, delimiter=',')
            elif isinstance(self.observations, pd.DataFrame):
                obs = self.observations
            else:
                raise Exception("Please provide the observation data as a "
                                "dictionary via observations attribute or pass"
                                " the csv-file path to MeasurementFile "
                                "attribute")
        elif case.lower() == 'valid':
            if isinstance(self.observations_valid, dict) and \
              bool(self.observations_valid):
                obs = pd.DataFrame.from_dict(self.observations_valid)
            elif self.meas_file_valid is not None:
                file_path = os.path.join(os.getcwd(), self.meas_file_valid)
                obs = pd.read_csv(file_path, delimiter=',')
            elif isinstance(self.observations_valid, pd.DataFrame):
                obs = self.observations_valid
            else:
                raise Exception("Please provide the observation data as a "
                                "dictionary via Observations attribute or pass"
                                " the csv-file path to MeasurementFile "
                                "attribute")

        # Compute the number of observation
        n_obs = obs[self.Output.names].notnull().sum().values.sum()

        if case.lower() == 'calib':
            self.observations = obs
            self.n_obs = n_obs
            return self.observations
        elif case.lower() == 'valid':
            self.observations_valid = obs
            self.n_obs_valid = n_obs
            return self.observations_valid

    # -------------------------------------------------------------------------
    def read_mc_reference(self):
        """
        Is used, if a Monte-Carlo reference is available for
        further in-depth post-processing after meta-model training.

        Returns
        -------
        None

        """
        if self.mc_ref_file is None and not hasattr(self, 'mc_reference'):
            return
        elif isinstance(self.mc_reference, dict) and bool(self.mc_reference):
            self.mc_reference = pd.DataFrame.from_dict(self.mc_reference)
        elif self.mc_ref_file is not None:
            file_path = os.path.join(os.getcwd(), self.mc_ref_file)
            self.mc_reference = pd.read_csv(file_path, delimiter=',')
        else:
            raise Exception("Please provide the MC reference data as a "
                            "dictionary via mc_reference attribute or pass the"
                            " csv-file path to mc_ref_file attribute")
        return self.mc_reference

    # -------------------------------------------------------------------------
    def read_output(self):
        """
        Reads the the parser output file and returns it as an
         executable function. It is required when the models returns the
         simulation outputs in csv files.

        Returns
        -------
        Output : func
            Output parser function.

        """
        output_func_name = self.Output.parser

        output_func = getattr(__import__(output_func_name), output_func_name)

        file_names = []
        for File in self.Output.file_names:
            file_names.append(os.path.join(self.exe_path, File))
        try:
            output = output_func(self.name, file_names)
        except TypeError:
            output = output_func(file_names)
        return output

    # -------------------------------------------------------------------------
    def update_input_params(self, new_input_file, param_set):
        """
        Finds this pattern with <X1> in the new_input_file and replace it with
         the new value from the array param_sets.

        Parameters
        ----------
        new_input_file : list
            List of the input files with the adapted names.
        param_set : array of shape (n_params)
            Parameter set.

        Returns
        -------
        None.

        """
        NofPa = param_set.shape[0]
        text_to_search_list = [f'<X{i+1}>' for i in range(NofPa)]

        for filename in new_input_file:
            # Read in the file
            with open(filename, 'r') as file:
                filedata = file.read()

            # Replace the target string
            for text_to_search, params in zip(text_to_search_list, param_set):
                filedata = filedata.replace(text_to_search, f'{params:0.4e}')

            # Write the file out again
            with open(filename, 'w') as file:
                file.write(filedata)

    # -------------------------------------------------------------------------
    def run_command(self, command, output_file_names):
        """
        Runs the execution command given by the user to run the given model.
        It checks if the output files have been generated. If yes, the jobe is
        done and it extracts and returns the requested output(s). Otherwise,
        it executes the command again.

        Parameters
        ----------
        command : str
            The shell command to be executed.
        output_file_names : list
            Name of the output file names.

        Returns
        -------
        simulation_outputs : array of shape (n_obs, n_outputs)
            Simulation outputs.

        """

        # Check if simulation is finished
        while True:
            time.sleep(3)
            files = os.listdir(".")
            if all(elem in files for elem in output_file_names):
                break
            else:
                # Run command
                Process = os.system(f'./../{command}')
                if Process != 0:
                    print('\nMessage 1:')
                    print(f'\tIf value of \'{Process}\' is a non-zero value, '
                          'then compilation problems \n' % Process)

        os.chdir("..")

        # Read the output
        simulation_outputs = self.read_output()

        return simulation_outputs

    # -------------------------------------------------------------------------
    def run_forwardmodel(self, xx):
        """
        This function creates subdirectory for the current run and copies the
        necessary files to this directory and renames them. Next, it executes
        the given command.

        Parameters
        ----------
        xx : tuple
            A tuple including parameter set, simulation number and key string.

        Returns
        -------
        output : array of shape (n_outputs+1, n_obs)
            An array passed by the output paraser containing the x_values as
            the first row and the simulations results stored in the the rest of
            the array.

        """
        c_points, run_no, key_str = xx

        # Handle if only one imput file is provided
        if not isinstance(self.input_template, list):
            self.input_template = [self.input_template]
        if not isinstance(self.input_file, list):
            self.input_file = [self.input_file]

        new_input_file = []
        # Loop over the InputTemplates:
        for in_temp in self.input_template:
            if '/' in in_temp:
                in_temp = in_temp.split('/')[-1]
            new_input_file.append(in_temp.split('.tpl')[0] + key_str +
                                  f"_{run_no+1}" + in_temp.split('.tpl')[1])

        # Create directories
        newpath = self.name + key_str + f'_{run_no+1}'
        if not os.path.exists(newpath):
            os.makedirs(newpath)

        # Copy the necessary files to the directories
        for in_temp in self.input_template:
            # Input file(s) of the model
            shutil.copy2(in_temp, newpath)
        # Auxiliary file
        if self.aux_file is not None:
            shutil.copy2(self.aux_file, newpath)  # Auxiliary file

        # Rename the Inputfile and/or auxiliary file
        os.chdir(newpath)
        for input_tem, input_file in zip(self.input_template, new_input_file):
            if '/' in input_tem:
                input_tem = input_tem.split('/')[-1]
            os.rename(input_tem, input_file)

        # Update the parametrs in Input file
        self.update_input_params(new_input_file, c_points)

        # Update the user defined command and the execution path
        try:
            new_command = self.shell_command.replace(self.input_file[0],
                                                     new_input_file[0])
            new_command = new_command.replace(self.input_file[1],
                                              new_input_file[1])
        except:
            new_command = self.shell_command.replace(self.input_file[0],
                                                     new_input_file[0])
        # Set the exe path if not provided
        if not bool(self.exe_path):
            self.exe_path = os.getcwd()

        # Run the model
        output = self.run_command(new_command, self.Output.file_names)

        return output

    # -------------------------------------------------------------------------
    def run_model_parallel(self, c_points, prevRun_No=0, key_str='',
                           mp=True):
        """
        Runs model simulations. If mp is true (default), then the simulations
         are started in parallel.

        Parameters
        ----------
        c_points : array of shape (n_samples, n_params)
            Collocation points (training set).
        prevRun_No : int, optional
            Previous run number, in case the sequential design is selected.
            The default is `0`.
        key_str : str, optional
            A descriptive string for validation runs. The default is `''`.
        mp : bool, optional
            Multiprocessing. The default is `True`.

        Returns
        -------
        all_outputs : dict
            A dictionary with x values (time step or point id) and all outputs.
            Each key contains an array of the shape `(n_samples, n_obs)`.
        new_c_points : array
            Updated collocation points (training set). If a simulation does not
            executed successfully, the parameter set is removed.

        """

        # Create hdf5 metadata
        hdf5file = f'ExpDesign_{self.name}.hdf5'
        hdf5_exist = os.path.exists(hdf5file)
        file = h5py.File(hdf5file, 'a')

        # Initilization
        n_c_points = len(c_points)
        self.n_outputs = len(self.Output.names)
        all_outputs = {}

        # Extract the function
        if self.link_type.lower() == 'function':
            # Prepare the function
            Function = getattr(__import__(self.py_file), self.py_file)
        # ---------------------------------------------------------------
        # -------------- Multiprocessing with Pool Class ----------------
        # ---------------------------------------------------------------
        # Start a pool with the number of CPUs
        if self.n_cpus is None:
            n_cpus = multiprocessing.cpu_count()
        else:
            n_cpus = self.n_cpus

        # Run forward model either normal or with multiprocessing
        if not self.multi_process:
            group_results = list([self.run_forwardmodel((c_points,
                                                         prevRun_No,
                                                         key_str))])
        else:
            with multiprocessing.Pool(n_cpus) as p:
                desc = f'Running forward model {key_str}'
                if self.link_type.lower() == 'function':
                    imap_var = p.imap(Function, c_points[:, np.newaxis])
                else:
                    args = zip(c_points,
                               [prevRun_No+i for i in range(n_c_points)],
                               [key_str]*n_c_points)
                    imap_var = p.imap(self.run_forwardmodel, args)

                group_results = list(tqdm.tqdm(imap_var, total=n_c_points,
                                               desc=desc))

        # Save time steps or x-values
        x_values = group_results[0][0]
        all_outputs["x_values"] = x_values
        if not hdf5_exist:
            if type(x_values) is dict:
                grp_x_values = file.create_group("x_values/")
                for varIdx, var in enumerate(self.Output.names):
                    grp_x_values.create_dataset(var, data=x_values[var])
            else:
                file.create_dataset("x_values", data=x_values)

        # save each output in their corresponding array
        NaN_idx = []
        for varIdx, var in enumerate(self.Output.names):

            if not hdf5_exist:
                grpY = file.create_group("EDY/"+var)
            else:
                grpY = file.get("EDY/"+var)

            Outputs = np.asarray([item[varIdx+1] for item in group_results],
                                 dtype=np.float64)

            if prevRun_No == 0 and key_str == '':
                grpY.create_dataset(f'init_{key_str}', data=Outputs)
            else:
                try:
                    oldEDY = np.array(file[f'EDY/{var}/adaptive_{key_str}'])
                    del file[f'EDY/{var}/adaptive_{key_str}']
                    data = np.vstack((oldEDY, Outputs))
                except KeyError:
                    data = Outputs
                grpY.create_dataset('adaptive_'+key_str, data=data)

            NaN_idx = np.unique(np.argwhere(np.isnan(Outputs))[:, 0])
            all_outputs[var] = np.delete(Outputs, NaN_idx, axis=0)

            if prevRun_No == 0 and key_str == '':
                grpY.create_dataset(f"New_init_{key_str}",
                                    data=all_outputs[var])
            else:
                try:
                    name = f'EDY/{var}/New_adaptive_{key_str}'
                    oldEDY = np.array(file[name])
                    del file[f'EDY/{var}/New_adaptive_{key_str}']
                    data = np.vstack((oldEDY, all_outputs[var]))
                except KeyError:
                    data = all_outputs[var]
                grpY.create_dataset(f'New_adaptive_{key_str}', data=data)

        # Print the collocation points whose simulations crashed
        if len(NaN_idx) != 0:
            print('\n')
            print('*'*20)
            print("\nThe following parametersets have been removed:\n",
                  c_points[NaN_idx])
            print("\n")
            print('*'*20)

        # Pass it to the attribute
        new_c_points = np.delete(c_points, NaN_idx, axis=0)
        self.OutputMatrix = all_outputs

        # Save CollocationPoints
        grpX = file.create_group("EDX") if not hdf5_exist else file.get("EDX")
        if prevRun_No == 0 and key_str == '':
            grpX.create_dataset("init_"+key_str, data=c_points)
            if len(NaN_idx) != 0:
                grpX.create_dataset("New_init_"+key_str, data=new_c_points)

        else:
            try:
                name = f'EDX/adaptive_{key_str}'
                oldCollocationPoints = np.array(file[name])
                del file[f'EDX/adaptive_{key_str}']
                data = np.vstack((oldCollocationPoints, new_c_points))
            except KeyError:
                data = new_c_points
            grpX.create_dataset('adaptive_'+key_str, data=data)

            if len(NaN_idx) != 0:
                try:
                    name = f'EDX/New_adaptive_{key_str}'
                    oldCollocationPoints = np.array(file[name])
                    del file[f'EDX/New_adaptive_{key_str}']
                    data = np.vstack((oldCollocationPoints, new_c_points))
                except KeyError:
                    data = new_c_points
                grpX.create_dataset('New_adaptive_'+key_str, data=data)

        # Close h5py file
        file.close()

        return all_outputs, new_c_points

    # -------------------------------------------------------------------------
    def zip_subdirs(self, dir_name, key):
        """
        Zips all the files containing the key(word).

        Parameters
        ----------
        dir_name : str
            Directory name.
        key : str
            Keyword to search for.

        Returns
        -------
        None.

        """
        # setup file paths variable
        dir_list = []
        file_paths = []

        # Read all directory, subdirectories and file lists
        dir_path = os.getcwd()

        for root, directories, files in os.walk(dir_path):
            for directory in directories:
                # Create the full filepath by using os module.
                if key in directory:
                    folderPath = os.path.join(dir_path, directory)
                    dir_list.append(folderPath)

        # Loop over the identified directories to store the file paths
        for direct_name in dir_list:
            for root, directories, files in os.walk(direct_name):
                for filename in files:
                    # Create the full filepath by using os module.
                    filePath = os.path.join(root, filename)
                    file_paths.append('.'+filePath.split(dir_path)[1])

        # writing files to a zipfile
        if len(file_paths) != 0:
            zip_file = zipfile.ZipFile(dir_name+'.zip', 'w')
            with zip_file:
                # writing each file one by one
                for file in file_paths:
                    zip_file.write(file)

            file_paths = [path for path in os.listdir('.') if key in path]

            for path in file_paths:
                shutil.rmtree(path)

            print("\n")
            print(f'{dir_name}.zip file has been created successfully!\n')

        return

Class variables

var Output

Methods

def within_range(self, out, minout, maxout)
Expand source code
def within_range(self, out, minout, maxout):
    inside = False
    if (out > minout).all() and (out < maxout).all():
        inside = True
    return inside
def read_observation(self, case='calib')

Reads/prepare the observation/measurement data for calibration.

Returns

DataFrame
A dataframe with the calibration data.
Expand source code
def read_observation(self, case='calib'):
    """
    Reads/prepare the observation/measurement data for
    calibration.

    Returns
    -------
    DataFrame
        A dataframe with the calibration data.

    """
    if case.lower() == 'calib':
        if isinstance(self.observations, dict) and bool(self.observations):
            obs = pd.DataFrame.from_dict(self.observations)
        elif self.meas_file is not None:
            file_path = os.path.join(os.getcwd(), self.meas_file)
            obs = pd.read_csv(file_path, delimiter=',')
        elif isinstance(self.observations, pd.DataFrame):
            obs = self.observations
        else:
            raise Exception("Please provide the observation data as a "
                            "dictionary via observations attribute or pass"
                            " the csv-file path to MeasurementFile "
                            "attribute")
    elif case.lower() == 'valid':
        if isinstance(self.observations_valid, dict) and \
          bool(self.observations_valid):
            obs = pd.DataFrame.from_dict(self.observations_valid)
        elif self.meas_file_valid is not None:
            file_path = os.path.join(os.getcwd(), self.meas_file_valid)
            obs = pd.read_csv(file_path, delimiter=',')
        elif isinstance(self.observations_valid, pd.DataFrame):
            obs = self.observations_valid
        else:
            raise Exception("Please provide the observation data as a "
                            "dictionary via Observations attribute or pass"
                            " the csv-file path to MeasurementFile "
                            "attribute")

    # Compute the number of observation
    n_obs = obs[self.Output.names].notnull().sum().values.sum()

    if case.lower() == 'calib':
        self.observations = obs
        self.n_obs = n_obs
        return self.observations
    elif case.lower() == 'valid':
        self.observations_valid = obs
        self.n_obs_valid = n_obs
        return self.observations_valid
def read_mc_reference(self)

Is used, if a Monte-Carlo reference is available for further in-depth post-processing after meta-model training.

Returns

None
 
Expand source code
def read_mc_reference(self):
    """
    Is used, if a Monte-Carlo reference is available for
    further in-depth post-processing after meta-model training.

    Returns
    -------
    None

    """
    if self.mc_ref_file is None and not hasattr(self, 'mc_reference'):
        return
    elif isinstance(self.mc_reference, dict) and bool(self.mc_reference):
        self.mc_reference = pd.DataFrame.from_dict(self.mc_reference)
    elif self.mc_ref_file is not None:
        file_path = os.path.join(os.getcwd(), self.mc_ref_file)
        self.mc_reference = pd.read_csv(file_path, delimiter=',')
    else:
        raise Exception("Please provide the MC reference data as a "
                        "dictionary via mc_reference attribute or pass the"
                        " csv-file path to mc_ref_file attribute")
    return self.mc_reference
def read_output(self)

Reads the the parser output file and returns it as an executable function. It is required when the models returns the simulation outputs in csv files.

Returns

Output : func
Output parser function.
Expand source code
def read_output(self):
    """
    Reads the the parser output file and returns it as an
     executable function. It is required when the models returns the
     simulation outputs in csv files.

    Returns
    -------
    Output : func
        Output parser function.

    """
    output_func_name = self.Output.parser

    output_func = getattr(__import__(output_func_name), output_func_name)

    file_names = []
    for File in self.Output.file_names:
        file_names.append(os.path.join(self.exe_path, File))
    try:
        output = output_func(self.name, file_names)
    except TypeError:
        output = output_func(file_names)
    return output
def update_input_params(self, new_input_file, param_set)

Finds this pattern with in the new_input_file and replace it with the new value from the array param_sets.

Parameters

new_input_file : list
List of the input files with the adapted names.
param_set : array of shape (n_params)
Parameter set.

Returns

None.

Expand source code
def update_input_params(self, new_input_file, param_set):
    """
    Finds this pattern with <X1> in the new_input_file and replace it with
     the new value from the array param_sets.

    Parameters
    ----------
    new_input_file : list
        List of the input files with the adapted names.
    param_set : array of shape (n_params)
        Parameter set.

    Returns
    -------
    None.

    """
    NofPa = param_set.shape[0]
    text_to_search_list = [f'<X{i+1}>' for i in range(NofPa)]

    for filename in new_input_file:
        # Read in the file
        with open(filename, 'r') as file:
            filedata = file.read()

        # Replace the target string
        for text_to_search, params in zip(text_to_search_list, param_set):
            filedata = filedata.replace(text_to_search, f'{params:0.4e}')

        # Write the file out again
        with open(filename, 'w') as file:
            file.write(filedata)
def run_command(self, command, output_file_names)

Runs the execution command given by the user to run the given model. It checks if the output files have been generated. If yes, the jobe is done and it extracts and returns the requested output(s). Otherwise, it executes the command again.

Parameters

command : str
The shell command to be executed.
output_file_names : list
Name of the output file names.

Returns

simulation_outputs : array of shape (n_obs, n_outputs)
Simulation outputs.
Expand source code
def run_command(self, command, output_file_names):
    """
    Runs the execution command given by the user to run the given model.
    It checks if the output files have been generated. If yes, the jobe is
    done and it extracts and returns the requested output(s). Otherwise,
    it executes the command again.

    Parameters
    ----------
    command : str
        The shell command to be executed.
    output_file_names : list
        Name of the output file names.

    Returns
    -------
    simulation_outputs : array of shape (n_obs, n_outputs)
        Simulation outputs.

    """

    # Check if simulation is finished
    while True:
        time.sleep(3)
        files = os.listdir(".")
        if all(elem in files for elem in output_file_names):
            break
        else:
            # Run command
            Process = os.system(f'./../{command}')
            if Process != 0:
                print('\nMessage 1:')
                print(f'\tIf value of \'{Process}\' is a non-zero value, '
                      'then compilation problems \n' % Process)

    os.chdir("..")

    # Read the output
    simulation_outputs = self.read_output()

    return simulation_outputs
def run_forwardmodel(self, xx)

This function creates subdirectory for the current run and copies the necessary files to this directory and renames them. Next, it executes the given command.

Parameters

xx : tuple
A tuple including parameter set, simulation number and key string.

Returns

output : array of shape (n_outputs+1, n_obs)
An array passed by the output paraser containing the x_values as the first row and the simulations results stored in the the rest of the array.
Expand source code
def run_forwardmodel(self, xx):
    """
    This function creates subdirectory for the current run and copies the
    necessary files to this directory and renames them. Next, it executes
    the given command.

    Parameters
    ----------
    xx : tuple
        A tuple including parameter set, simulation number and key string.

    Returns
    -------
    output : array of shape (n_outputs+1, n_obs)
        An array passed by the output paraser containing the x_values as
        the first row and the simulations results stored in the the rest of
        the array.

    """
    c_points, run_no, key_str = xx

    # Handle if only one imput file is provided
    if not isinstance(self.input_template, list):
        self.input_template = [self.input_template]
    if not isinstance(self.input_file, list):
        self.input_file = [self.input_file]

    new_input_file = []
    # Loop over the InputTemplates:
    for in_temp in self.input_template:
        if '/' in in_temp:
            in_temp = in_temp.split('/')[-1]
        new_input_file.append(in_temp.split('.tpl')[0] + key_str +
                              f"_{run_no+1}" + in_temp.split('.tpl')[1])

    # Create directories
    newpath = self.name + key_str + f'_{run_no+1}'
    if not os.path.exists(newpath):
        os.makedirs(newpath)

    # Copy the necessary files to the directories
    for in_temp in self.input_template:
        # Input file(s) of the model
        shutil.copy2(in_temp, newpath)
    # Auxiliary file
    if self.aux_file is not None:
        shutil.copy2(self.aux_file, newpath)  # Auxiliary file

    # Rename the Inputfile and/or auxiliary file
    os.chdir(newpath)
    for input_tem, input_file in zip(self.input_template, new_input_file):
        if '/' in input_tem:
            input_tem = input_tem.split('/')[-1]
        os.rename(input_tem, input_file)

    # Update the parametrs in Input file
    self.update_input_params(new_input_file, c_points)

    # Update the user defined command and the execution path
    try:
        new_command = self.shell_command.replace(self.input_file[0],
                                                 new_input_file[0])
        new_command = new_command.replace(self.input_file[1],
                                          new_input_file[1])
    except:
        new_command = self.shell_command.replace(self.input_file[0],
                                                 new_input_file[0])
    # Set the exe path if not provided
    if not bool(self.exe_path):
        self.exe_path = os.getcwd()

    # Run the model
    output = self.run_command(new_command, self.Output.file_names)

    return output
def run_model_parallel(self, c_points, prevRun_No=0, key_str='', mp=True)

Runs model simulations. If mp is true (default), then the simulations are started in parallel.

Parameters

c_points : array of shape (n_samples, n_params)
Collocation points (training set).
prevRun_No : int, optional
Previous run number, in case the sequential design is selected. The default is 0.
key_str : str, optional
A descriptive string for validation runs. The default is ''.
mp : bool, optional
Multiprocessing. The default is True.

Returns

all_outputs : dict
A dictionary with x values (time step or point id) and all outputs. Each key contains an array of the shape (n_samples, n_obs).
new_c_points : array
Updated collocation points (training set). If a simulation does not executed successfully, the parameter set is removed.
Expand source code
def run_model_parallel(self, c_points, prevRun_No=0, key_str='',
                       mp=True):
    """
    Runs model simulations. If mp is true (default), then the simulations
     are started in parallel.

    Parameters
    ----------
    c_points : array of shape (n_samples, n_params)
        Collocation points (training set).
    prevRun_No : int, optional
        Previous run number, in case the sequential design is selected.
        The default is `0`.
    key_str : str, optional
        A descriptive string for validation runs. The default is `''`.
    mp : bool, optional
        Multiprocessing. The default is `True`.

    Returns
    -------
    all_outputs : dict
        A dictionary with x values (time step or point id) and all outputs.
        Each key contains an array of the shape `(n_samples, n_obs)`.
    new_c_points : array
        Updated collocation points (training set). If a simulation does not
        executed successfully, the parameter set is removed.

    """

    # Create hdf5 metadata
    hdf5file = f'ExpDesign_{self.name}.hdf5'
    hdf5_exist = os.path.exists(hdf5file)
    file = h5py.File(hdf5file, 'a')

    # Initilization
    n_c_points = len(c_points)
    self.n_outputs = len(self.Output.names)
    all_outputs = {}

    # Extract the function
    if self.link_type.lower() == 'function':
        # Prepare the function
        Function = getattr(__import__(self.py_file), self.py_file)
    # ---------------------------------------------------------------
    # -------------- Multiprocessing with Pool Class ----------------
    # ---------------------------------------------------------------
    # Start a pool with the number of CPUs
    if self.n_cpus is None:
        n_cpus = multiprocessing.cpu_count()
    else:
        n_cpus = self.n_cpus

    # Run forward model either normal or with multiprocessing
    if not self.multi_process:
        group_results = list([self.run_forwardmodel((c_points,
                                                     prevRun_No,
                                                     key_str))])
    else:
        with multiprocessing.Pool(n_cpus) as p:
            desc = f'Running forward model {key_str}'
            if self.link_type.lower() == 'function':
                imap_var = p.imap(Function, c_points[:, np.newaxis])
            else:
                args = zip(c_points,
                           [prevRun_No+i for i in range(n_c_points)],
                           [key_str]*n_c_points)
                imap_var = p.imap(self.run_forwardmodel, args)

            group_results = list(tqdm.tqdm(imap_var, total=n_c_points,
                                           desc=desc))

    # Save time steps or x-values
    x_values = group_results[0][0]
    all_outputs["x_values"] = x_values
    if not hdf5_exist:
        if type(x_values) is dict:
            grp_x_values = file.create_group("x_values/")
            for varIdx, var in enumerate(self.Output.names):
                grp_x_values.create_dataset(var, data=x_values[var])
        else:
            file.create_dataset("x_values", data=x_values)

    # save each output in their corresponding array
    NaN_idx = []
    for varIdx, var in enumerate(self.Output.names):

        if not hdf5_exist:
            grpY = file.create_group("EDY/"+var)
        else:
            grpY = file.get("EDY/"+var)

        Outputs = np.asarray([item[varIdx+1] for item in group_results],
                             dtype=np.float64)

        if prevRun_No == 0 and key_str == '':
            grpY.create_dataset(f'init_{key_str}', data=Outputs)
        else:
            try:
                oldEDY = np.array(file[f'EDY/{var}/adaptive_{key_str}'])
                del file[f'EDY/{var}/adaptive_{key_str}']
                data = np.vstack((oldEDY, Outputs))
            except KeyError:
                data = Outputs
            grpY.create_dataset('adaptive_'+key_str, data=data)

        NaN_idx = np.unique(np.argwhere(np.isnan(Outputs))[:, 0])
        all_outputs[var] = np.delete(Outputs, NaN_idx, axis=0)

        if prevRun_No == 0 and key_str == '':
            grpY.create_dataset(f"New_init_{key_str}",
                                data=all_outputs[var])
        else:
            try:
                name = f'EDY/{var}/New_adaptive_{key_str}'
                oldEDY = np.array(file[name])
                del file[f'EDY/{var}/New_adaptive_{key_str}']
                data = np.vstack((oldEDY, all_outputs[var]))
            except KeyError:
                data = all_outputs[var]
            grpY.create_dataset(f'New_adaptive_{key_str}', data=data)

    # Print the collocation points whose simulations crashed
    if len(NaN_idx) != 0:
        print('\n')
        print('*'*20)
        print("\nThe following parametersets have been removed:\n",
              c_points[NaN_idx])
        print("\n")
        print('*'*20)

    # Pass it to the attribute
    new_c_points = np.delete(c_points, NaN_idx, axis=0)
    self.OutputMatrix = all_outputs

    # Save CollocationPoints
    grpX = file.create_group("EDX") if not hdf5_exist else file.get("EDX")
    if prevRun_No == 0 and key_str == '':
        grpX.create_dataset("init_"+key_str, data=c_points)
        if len(NaN_idx) != 0:
            grpX.create_dataset("New_init_"+key_str, data=new_c_points)

    else:
        try:
            name = f'EDX/adaptive_{key_str}'
            oldCollocationPoints = np.array(file[name])
            del file[f'EDX/adaptive_{key_str}']
            data = np.vstack((oldCollocationPoints, new_c_points))
        except KeyError:
            data = new_c_points
        grpX.create_dataset('adaptive_'+key_str, data=data)

        if len(NaN_idx) != 0:
            try:
                name = f'EDX/New_adaptive_{key_str}'
                oldCollocationPoints = np.array(file[name])
                del file[f'EDX/New_adaptive_{key_str}']
                data = np.vstack((oldCollocationPoints, new_c_points))
            except KeyError:
                data = new_c_points
            grpX.create_dataset('New_adaptive_'+key_str, data=data)

    # Close h5py file
    file.close()

    return all_outputs, new_c_points
def zip_subdirs(self, dir_name, key)

Zips all the files containing the key(word).

Parameters

dir_name : str
Directory name.
key : str
Keyword to search for.

Returns

None.

Expand source code
def zip_subdirs(self, dir_name, key):
    """
    Zips all the files containing the key(word).

    Parameters
    ----------
    dir_name : str
        Directory name.
    key : str
        Keyword to search for.

    Returns
    -------
    None.

    """
    # setup file paths variable
    dir_list = []
    file_paths = []

    # Read all directory, subdirectories and file lists
    dir_path = os.getcwd()

    for root, directories, files in os.walk(dir_path):
        for directory in directories:
            # Create the full filepath by using os module.
            if key in directory:
                folderPath = os.path.join(dir_path, directory)
                dir_list.append(folderPath)

    # Loop over the identified directories to store the file paths
    for direct_name in dir_list:
        for root, directories, files in os.walk(direct_name):
            for filename in files:
                # Create the full filepath by using os module.
                filePath = os.path.join(root, filename)
                file_paths.append('.'+filePath.split(dir_path)[1])

    # writing files to a zipfile
    if len(file_paths) != 0:
        zip_file = zipfile.ZipFile(dir_name+'.zip', 'w')
        with zip_file:
            # writing each file one by one
            for file in file_paths:
                zip_file.write(file)

        file_paths = [path for path in os.listdir('.') if key in path]

        for path in file_paths:
            shutil.rmtree(path)

        print("\n")
        print(f'{dir_name}.zip file has been created successfully!\n')

    return
class Input

A class to define the uncertain input parameters.

Attributes

Marginals : obj
Marginal objects. See inputs.Marginal.
Rosenblatt : bool
If Rossenblatt transformation is required for the dependent input parameters.

Examples

Marginals can be defined as following:

>>> Inputs.add_marginals()
>>> Inputs.Marginals[0].name = 'X_1'
>>> Inputs.Marginals[0].dist_type = 'uniform'
>>> Inputs.Marginals[0].parameters = [-5, 5]

If there is no common data is avaliable, the input data can be given as following:

>>> Inputs.add_marginals()
>>> Inputs.Marginals[0].name = 'X_1'
>>> Inputs.Marginals[0].input_data = input_data
Expand source code
class Input:
    """
    A class to define the uncertain input parameters.

    Attributes
    ----------
    Marginals : obj
        Marginal objects. See `inputs.Marginal`.
    Rosenblatt : bool
        If Rossenblatt transformation is required for the dependent input
        parameters.

    Examples
    -------
    Marginals can be defined as following:

    >>> Inputs.add_marginals()
    >>> Inputs.Marginals[0].name = 'X_1'
    >>> Inputs.Marginals[0].dist_type = 'uniform'
    >>> Inputs.Marginals[0].parameters = [-5, 5]

    If there is no common data is avaliable, the input data can be given
    as following:

    >>> Inputs.add_marginals()
    >>> Inputs.Marginals[0].name = 'X_1'
    >>> Inputs.Marginals[0].input_data = input_data
    """
    poly_coeffs_flag = True

    def __init__(self):
        self.Marginals = []
        self.Rosenblatt = False

    def add_marginals(self):
        """
        Adds a new Marginal object to the input object.

        Returns
        -------
        None.

        """
        self.Marginals.append(Marginal())

Class variables

var poly_coeffs_flag

Methods

def add_marginals(self)

Adds a new Marginal object to the input object.

Returns

None.

Expand source code
def add_marginals(self):
    """
    Adds a new Marginal object to the input object.

    Returns
    -------
    None.

    """
    self.Marginals.append(Marginal())
class Discrepancy (InputDisc='', disc_type='Gaussian', parameters=None)

Discrepancy class for Bayesian inference method. We define the reference or reality to be equal to what we can model and a descripancy term \epsilon . We consider the followin format:

\textbf{y}_{\text{reality}} = \mathcal{M}(\theta) + \epsilon,

where \epsilon \in R^{N_{out}} represents the the effects of measurement error and model inaccuracy. For simplicity, it can be defined as an additive Gaussian disrepancy with zeromean and given covariance matrix \Sigma :

\epsilon \sim \mathcal{N}(\epsilon|0, \Sigma).

In the context of model inversion or calibration, an observation point \textbf{y}_i \in \mathcal{y} is a realization of a Gaussian distribution with mean value of \mathcal{M}(\theta) and covariance matrix of \Sigma .

p(\textbf{y}|\theta) = \mathcal{N}(\textbf{y}|\mathcal{M} (\theta))

The following options are available:

  • Option A: With known redidual covariance matrix \Sigma for independent measurements.

  • Option B: With unknown redidual covariance matrix \Sigma, paramethrized as \Sigma(\theta_{\epsilon})=\sigma^2 \textbf{I}_ {N_{out}} with unknown residual variances \sigma^2. This term will be jointly infer with the uncertain input parameters. For the inversion, you need to define a prior marginal via Input class. Note that \sigma^2 is only a single scalar multiplier for the diagonal entries of the covariance matrix \Sigma.

Attributes

InputDisc : obj
Input object. When the \sigma^2 is expected to be inferred jointly with the parameters (Option B).If multiple output groups are defined by Model.Output.names, each model output needs to have. a prior marginal using the Input class. The default is ''.
disc_type : str
Type of the noise definition. 'Gaussian' is only supported so far.
parameters : dict or pandas.DataFrame
Known residual variance \sigma^2, i.e. diagonal entry of the covariance matrix of the multivariate normal likelihood in case of Option A.
Expand source code
class Discrepancy:
    """
    Discrepancy class for Bayesian inference method.
    We define the reference or reality to be equal to what we can model and a
    descripancy term \\( \\epsilon \\). We consider the followin format:

    $$\\textbf{y}_{\\text{reality}} = \\mathcal{M}(\\theta) + \\epsilon,$$

    where \\( \\epsilon \\in R^{N_{out}} \\) represents the the effects of
    measurement error and model inaccuracy. For simplicity, it can be defined
    as an additive Gaussian disrepancy with zeromean and given covariance
    matrix \\( \\Sigma \\):

    $$\\epsilon \\sim \\mathcal{N}(\\epsilon|0, \\Sigma). $$

    In the context of model inversion or calibration, an observation point
    \\( \\textbf{y}_i \\in \\mathcal{y} \\) is a realization of a Gaussian
    distribution with mean value of \\(\\mathcal{M}(\\theta) \\) and covariance
    matrix of \\( \\Sigma \\).

    $$ p(\\textbf{y}|\\theta) = \\mathcal{N}(\\textbf{y}|\\mathcal{M}
                                             (\\theta))$$

    The following options are available:

    * Option A: With known redidual covariance matrix \\(\\Sigma\\) for
    independent measurements.

    * Option B: With unknown redidual covariance matrix \\(\\Sigma\\),
    paramethrized as \\(\\Sigma(\\theta_{\\epsilon})=\\sigma^2 \\textbf{I}_
    {N_{out}}\\) with unknown residual variances \\(\\sigma^2\\).
    This term will be jointly infer with the uncertain input parameters. For
    the inversion, you need to define a prior marginal via `Input` class. Note
    that \\(\\sigma^2\\) is only a single scalar multiplier for the diagonal
    entries of the covariance matrix \\(\\Sigma\\).

    Attributes
    ----------
    InputDisc : obj
        Input object. When the \\(\\sigma^2\\) is expected to be inferred
        jointly with the parameters (`Option B`).If multiple output groups are
        defined by `Model.Output.names`, each model output needs to have.
        a prior marginal using the `Input` class. The default is `''`.
    disc_type : str
        Type of the noise definition. `'Gaussian'` is only supported so far.
    parameters : dict or pandas.DataFrame
        Known residual variance \\(\\sigma^2\\), i.e. diagonal entry of the
        covariance matrix of the multivariate normal likelihood in case of
        `Option A`.

    """

    def __init__(self, InputDisc='', disc_type='Gaussian', parameters=None):
        self.InputDisc = InputDisc
        self.disc_type = disc_type
        self.parameters = parameters

    # -------------------------------------------------------------------------
    def get_sample(self, n_samples):
        """
        Generate samples for the \\(\\sigma^2\\), i.e. the diagonal entries of
        the variance-covariance matrix in the multivariate normal distribution.

        Parameters
        ----------
        n_samples : int
            Number of samples (parameter sets).

        Returns
        -------
        sigma2_prior: array of shape (n_samples, n_params)
            \\(\\sigma^2\\) samples.

        """
        self.n_samples = n_samples
        ExpDesign = ExpDesigns(self.InputDisc)
        self.sigma2_prior = ExpDesign.generate_ED(
            n_samples, sampling_method='random', max_pce_deg=1
            )
        # Store BoundTuples
        self.ExpDesign = ExpDesign

        # Naive approach: Fit a gaussian kernel to the provided data
        self.ExpDesign.JDist = stats.gaussian_kde(ExpDesign.raw_data)

        # Save the names of sigmas
        if len(self.InputDisc.Marginals) != 0:
            self.name = []
            for Marginalidx in range(len(self.InputDisc.Marginals)):
                self.name.append(self.InputDisc.Marginals[Marginalidx].name)

        return self.sigma2_prior

Methods

def get_sample(self, n_samples)

Generate samples for the \sigma^2, i.e. the diagonal entries of the variance-covariance matrix in the multivariate normal distribution.

Parameters

n_samples : int
Number of samples (parameter sets).

Returns

sigma2_prior : array of shape (n_samples, n_params)
\sigma^2 samples.
Expand source code
def get_sample(self, n_samples):
    """
    Generate samples for the \\(\\sigma^2\\), i.e. the diagonal entries of
    the variance-covariance matrix in the multivariate normal distribution.

    Parameters
    ----------
    n_samples : int
        Number of samples (parameter sets).

    Returns
    -------
    sigma2_prior: array of shape (n_samples, n_params)
        \\(\\sigma^2\\) samples.

    """
    self.n_samples = n_samples
    ExpDesign = ExpDesigns(self.InputDisc)
    self.sigma2_prior = ExpDesign.generate_ED(
        n_samples, sampling_method='random', max_pce_deg=1
        )
    # Store BoundTuples
    self.ExpDesign = ExpDesign

    # Naive approach: Fit a gaussian kernel to the provided data
    self.ExpDesign.JDist = stats.gaussian_kde(ExpDesign.raw_data)

    # Save the names of sigmas
    if len(self.InputDisc.Marginals) != 0:
        self.name = []
        for Marginalidx in range(len(self.InputDisc.Marginals)):
            self.name.append(self.InputDisc.Marginals[Marginalidx].name)

    return self.sigma2_prior
class MetaModel (input_obj, meta_model_type='PCE', pce_reg_method='OLS', pce_deg=1, pce_q_norm=1.0, dim_red_method='no', verbose=False)

Meta (surrogate) model

This class trains a surrogate model. It accepts an input object (input_obj) containing the specification of the distributions for uncertain parameters and a model object with instructions on how to run the computational model.

Attributes

input_obj : obj
Input object with the information on the model input parameters.
meta_model_type : str
Surrogate model types. Three surrogate model types are supported: polynomial chaos expansion (PCE), arbitrary PCE (aPCE) and Gaussian process regression (GPE). Default is PCE.
pce_reg_method : str

PCE regression method to compute the coefficients. The following regression methods are available:

  1. OLS: Ordinary Least Square method
  2. BRR: Bayesian Ridge Regression
  3. LARS: Least angle regression
  4. ARD: Bayesian ARD Regression
  5. FastARD: Fast Bayesian ARD Regression
  6. VBL: Variational Bayesian Learning
  7. EBL: Emperical Bayesian Learning Default is OLS.
pce_deg : int or list of int
Polynomial degree(s). If a list is given, an adaptive algorithm is used to find the best degree with the lowest Leave-One-Out cross-validation (LOO) error (or the highest score=1-LOO). Default is 1.
pce_q_norm : float
Hyperbolic (or q-norm) truncation for multi-indices of multivariate polynomials. Default is 1.0.
dim_red_method : str
Dimensionality reduction method for the output space. The available method is based on principal component analysis (PCA). The Default is 'no'. There are two ways to select number of components: use percentage of the explainable variance threshold (between 0 and 100) (Option A) or direct prescription of components' number (Option B):
>>> MetaModelOpts.dim_red_method = 'PCA'
>>> MetaModelOpts.var_pca_threshold = 99.999  # Option A
>>> MetaModelOpts.n_pca_components = 12 # Option B
verbose : bool
Prints summary of the regression results. Default is False.

Note

To define the sampling methods and the training set, an experimental design instance shall be defined. This can be done by:

>>> MetaModelOpts.add_ExpDesign()

Two experimental design schemes are supported: one-shot (normal) and adaptive sequential (sequential) designs. For experimental design refer to ExpDesigns.

Expand source code
class MetaModel:
    """
    Meta (surrogate) model

    This class trains a surrogate model. It accepts an input object (input_obj)
    containing the specification of the distributions for uncertain parameters
    and a model object with instructions on how to run the computational model.

    Attributes
    ----------
    input_obj : obj
        Input object with the information on the model input parameters.
    meta_model_type : str
        Surrogate model types. Three surrogate model types are supported:
        polynomial chaos expansion (`PCE`), arbitrary PCE (`aPCE`) and
        Gaussian process regression (`GPE`). Default is PCE.
    pce_reg_method : str
        PCE regression method to compute the coefficients. The following
        regression methods are available:

        1. OLS: Ordinary Least Square method
        2. BRR: Bayesian Ridge Regression
        3. LARS: Least angle regression
        4. ARD: Bayesian ARD Regression
        5. FastARD: Fast Bayesian ARD Regression
        6. VBL: Variational Bayesian Learning
        7. EBL: Emperical Bayesian Learning
        Default is `OLS`.
    pce_deg : int or list of int
        Polynomial degree(s). If a list is given, an adaptive algorithm is used
        to find the best degree with the lowest Leave-One-Out cross-validation
        (LOO) error (or the highest score=1-LOO). Default is `1`.
    pce_q_norm : float
        Hyperbolic (or q-norm) truncation for multi-indices of multivariate
        polynomials. Default is `1.0`.
    dim_red_method : str
        Dimensionality reduction method for the output space. The available
        method is based on principal component analysis (PCA). The Default is
        `'no'`. There are two ways to select number of components: use
        percentage of the explainable variance threshold (between 0 and 100)
        (Option A) or direct prescription of components' number (Option B):

            >>> MetaModelOpts.dim_red_method = 'PCA'
            >>> MetaModelOpts.var_pca_threshold = 99.999  # Option A
            >>> MetaModelOpts.n_pca_components = 12 # Option B

    verbose : bool
        Prints summary of the regression results. Default is `False`.

    Note
    -------
    To define the sampling methods and the training set, an experimental design
    instance shall be defined. This can be done by:

    >>> MetaModelOpts.add_ExpDesign()

    Two experimental design schemes are supported: one-shot (`normal`) and
    adaptive sequential (`sequential`) designs.
    For experimental design refer to `ExpDesigns`.

    """

    def __init__(self, input_obj, meta_model_type='PCE', pce_reg_method='OLS',
                 pce_deg=1, pce_q_norm=1.0, dim_red_method='no',
                 verbose=False):

        self.input_obj = input_obj
        self.meta_model_type = meta_model_type
        self.pce_reg_method = pce_reg_method
        self.pce_deg = pce_deg
        self.pce_q_norm = pce_q_norm
        self.dim_red_method = dim_red_method
        self.verbose = False

    # -------------------------------------------------------------------------
    def create_metamodel(self, Model):
        """
        Starts the training of the meta-model for the model objects containg
         the given computational model.

        Parameters
        ----------
        Model : obj
            Model object.

        Returns
        -------
        metamodel : obj
            The meta model object.

        """
        self.ModelObj = Model
        self.n_params = len(self.input_obj.Marginals)
        self.ExpDesignFlag = 'normal'
        # --- Prepare pce degree ---
        if self.meta_model_type.lower() == 'pce':
            if type(self.pce_deg) is not np.ndarray:
                self.pce_deg = np.array(self.pce_deg)

        if self.ExpDesign.method == 'sequential':
            from .sequential_design import SeqDesign
            seq_design = SeqDesign(self)
            metamodel = seq_design.train_seq_design(Model)

        elif self.ExpDesign.method == 'normal':
            self.ExpDesignFlag = 'normal'
            metamodel = self.train_norm_design(Model)

        else:
            raise Exception("The method for experimental design you requested"
                            " has not been implemented yet.")

        # Zip the model run directories
        if self.ModelObj.link_type.lower() == 'pylink':
            Model.zip_subdirs(Model.name, f'{Model.name}_')

        return metamodel

    # -------------------------------------------------------------------------
    def train_norm_design(self, Model, verbose=False):
        """
        This function loops over the outputs and each time step/point and fits
        the meta model.

        Parameters
        ----------
        Model : obj
            Model object.
        verbose : bool, optional
            Flag for a sequential design in silent mode. The default is False.

        Returns
        -------
        self: obj
            Meta-model object.

        """

        # Get the collocation points to run the forward model
        CollocationPoints, OutputDict = self.generate_ExpDesign(Model)

        # Initialize the nested dictionaries
        self.deg_dict = self.auto_vivification()
        self.q_norm_dict = self.auto_vivification()
        self.coeffs_dict = self.auto_vivification()
        self.basis_dict = self.auto_vivification()
        self.score_dict = self.auto_vivification()
        self.clf_poly = self.auto_vivification()
        self.gp_poly = self.auto_vivification()
        self.pca = self.auto_vivification()
        self.LCerror = self.auto_vivification()
        self.x_scaler = {}

        # Define the DegreeArray
        nSamples, ndim = CollocationPoints.shape
        self.DegreeArray = self.__select_degree(ndim, nSamples)

        # Generate all basis indices
        self.allBasisIndices = self.auto_vivification()
        for deg in self.DegreeArray:
            keys = self.allBasisIndices.keys()
            if deg not in np.fromiter(keys, dtype=float):
                # Generate the polynomial basis indices
                for qidx, q in enumerate(self.pce_q_norm):
                    basis_indices = self.create_basis_indices(degree=deg,
                                                              q_norm=q)
                    self.allBasisIndices[str(deg)][str(q)] = basis_indices

        # Evaluate the univariate polynomials on ExpDesign
        if self.meta_model_type.lower() != 'gpe':
            self.univ_p_val = self.univ_basis_vals(CollocationPoints)

        if 'x_values' in OutputDict:
            self.ExpDesign.x_values = OutputDict['x_values']
            del OutputDict['x_values']

        # --- Loop through data points and fit the surrogate ---
        if not verbose:
            print(f"\n>>>> Training the {self.meta_model_type} metamodel "
                  "started. <<<<<<\n")
            items = tqdm(OutputDict.items(), desc="Fitting regression")
        else:
            items = OutputDict.items()

        # For loop over the components/outputs
        for key, Output in items:

            # Dimensionality reduction with PCA, if specified
            if self.dim_red_method.lower() == 'pca':
                self.pca[key], target = self.pca_transformation(Output)
            else:
                target = Output

            # Parallel fit regression
            if self.meta_model_type.lower() == 'gpe':
                # Prepare the input matrix
                scaler = MinMaxScaler()
                X_S = scaler.fit_transform(CollocationPoints)

                self.x_scaler[key] = scaler

                out = Parallel(n_jobs=-1, prefer='threads')(
                    delayed(self.gaussian_process_emulator)(X_S, target[:, idx])
                    for idx in range(target.shape[1]))

                for idx in range(target.shape[1]):
                    self.gp_poly[key][f"y_{idx+1}"] = out[idx]

            else:
                out = Parallel(n_jobs=-1, prefer='threads')(
                    delayed(self.adaptive_regression)(CollocationPoints,
                                                      target[:, idx], idx)
                    for idx in range(target.shape[1]))

                for i in range(target.shape[1]):
                    # Create a dict to pass the variables
                    self.deg_dict[key][f"y_{i+1}"] = out[i]['degree']
                    self.q_norm_dict[key][f"y_{i+1}"] = out[i]['qnorm']
                    self.coeffs_dict[key][f"y_{i+1}"] = out[i]['coeffs']
                    self.basis_dict[key][f"y_{i+1}"] = out[i]['multi_indices']
                    self.score_dict[key][f"y_{i+1}"] = out[i]['LOOCVScore']
                    self.clf_poly[key][f"y_{i+1}"] = out[i]['clf_poly']
                    self.LCerror[key][f"y_{i+1}"] = out[i]['LCerror']

        if not verbose:
            print(f"\n>>>> Training the {self.meta_model_type} metamodel"
                  " sucessfully completed. <<<<<<\n")

        return self

    # -------------------------------------------------------------------------
    def create_basis_indices(self, degree, q_norm):
        """
        Creates set of selected multi-indices of multivariate polynomials for
        certain parameter numbers, polynomial degree, hyperbolic (or q-norm)
        truncation scheme.

        Parameters
        ----------
        degree : int
            Polynomial degree.
        q_norm : float
            hyperbolic (or q-norm) truncation.

        Returns
        -------
        basis_indices : array of shape (n_terms, n_params)
            Multi-indices of multivariate polynomials.

        """
        basis_indices = glexindex(start=0, stop=degree+1,
                                  dimensions=self.n_params,
                                  cross_truncation=q_norm,
                                  reverse=False, graded=True)
        return basis_indices

    # -------------------------------------------------------------------------
    def add_ExpDesign(self):
        """
        Instanciates experimental design object.

        Returns
        -------
        None.

        """
        self.ExpDesign = ExpDesigns(self.input_obj,
                                    meta_Model=self.meta_model_type)

    # -------------------------------------------------------------------------
    def generate_ExpDesign(self, Model):
        """
        Prepares the experimental design either by reading from the prescribed
        data or running simulations.

        Parameters
        ----------
        Model : obj
            Model object.

        Raises
        ------
        Exception
            If model sumulations are not provided properly.

        Returns
        -------
        ED_X_tr: array of shape (n_samples, n_params)
            Training samples transformed by an isoprobabilistic transformation.
        ED_Y: dict
            Model simulations (target) for all outputs.
        """
        ExpDesign = self.ExpDesign
        if self.ExpDesignFlag != 'sequential':
            # Read ExpDesign (training and targets) from the provided hdf5
            if ExpDesign.hdf5_file is not None:

                # Read hdf5 file
                f = h5py.File(ExpDesign.hdf5_file, 'r+')

                # Read EDX and pass it to ExpDesign object
                try:
                    ExpDesign.X = np.array(f["EDX/New_init_"])
                except KeyError:
                    ExpDesign.X = np.array(f["EDX/init_"])

                # Update number of initial samples
                ExpDesign.n_init_samples = ExpDesign.X.shape[0]

                # Read EDX and pass it to ExpDesign object
                out_names = self.ModelObj.Output.names
                ExpDesign.Y = {}

                # Extract x values
                try:
                    ExpDesign.Y["x_values"] = dict()
                    for varIdx, var in enumerate(out_names):
                        x = np.array(f[f"x_values/{var}"])
                        ExpDesign.Y["x_values"][var] = x
                except KeyError:
                    ExpDesign.Y["x_values"] = np.array(f["x_values"])

                # Store the output
                for varIdx, var in enumerate(out_names):
                    try:
                        y = np.array(f[f"EDY/{var}/New_init_"])
                    except KeyError:
                        y = np.array(f[f"EDY/{var}/init_"])
                    ExpDesign.Y[var] = y
                f.close()
            else:
                # Check if an old hdf5 file exists: if yes, rename it
                hdf5file = f'ExpDesign_{self.ModelObj.name}.hdf5'
                if os.path.exists(hdf5file):
                    os.rename(hdf5file, 'old_'+hdf5file)

        # ---- Prepare X samples ----
        ED_X, ED_X_tr = ExpDesign.generate_ED(ExpDesign.n_init_samples,
                                              ExpDesign.sampling_method,
                                              transform=True,
                                              max_pce_deg=np.max(self.pce_deg))
        ExpDesign.X = ED_X
        ExpDesign.collocationPoints = ED_X_tr
        self.bound_tuples = ExpDesign.bound_tuples

        # ---- Run simulations at X ----
        if not hasattr(ExpDesign, 'Y') or ExpDesign.Y is None:
            print('\n Now the forward model needs to be run!\n')
            ED_Y, up_ED_X = Model.run_model_parallel(ED_X)
            ExpDesign.X = up_ED_X
            self.ModelOutputDict = ED_Y
            ExpDesign.Y = ED_Y
        else:
            # Check if a dict has been passed.
            if type(ExpDesign.Y) is dict:
                self.ModelOutputDict = ExpDesign.Y
            else:
                raise Exception('Please provide either a dictionary or a hdf5'
                                'file to ExpDesign.hdf5_file argument.')

        return ED_X_tr, self.ModelOutputDict

    # -------------------------------------------------------------------------
    def univ_basis_vals(self, samples, n_max=None):
        """
        Evaluates univariate regressors along input directions.

        Parameters
        ----------
        samples : array of shape (n_samples, n_params)
            Samples.
        n_max : int, optional
            Maximum polynomial degree. The default is `None`.

        Returns
        -------
        univ_basis: array of shape (n_samples, n_params, n_max+1)
            All univariate regressors up to n_max.
        """
        # Extract information
        poly_types = self.ExpDesign.poly_types
        if samples.ndim != 2:
            samples = samples.reshape(1, len(samples))
        n_max = np.max(self.pce_deg) if n_max is None else n_max

        # Extract poly coeffs
        if self.ExpDesign.input_data_given or self.ExpDesign.apce:
            apolycoeffs = self.ExpDesign.polycoeffs
        else:
            apolycoeffs = None

        # Evaluate univariate basis
        univ_basis = eval_univ_basis(samples, n_max, poly_types, apolycoeffs)

        return univ_basis

    # -------------------------------------------------------------------------
    def create_psi(self, basis_indices, univ_p_val):
        """
        This function assemble the design matrix Psi from the given basis index
        set INDICES and the univariate polynomial evaluations univ_p_val.

        Parameters
        ----------
        basis_indices : array of shape (n_terms, n_params)
            Multi-indices of multivariate polynomials.
        univ_p_val : array of (n_samples, n_params, n_max+1)
            All univariate regressors up to `n_max`.

        Raises
        ------
        ValueError
            n_terms in arguments do not match.

        Returns
        -------
        psi : array of shape (n_samples, n_terms)
            Multivariate regressors.

        """
        # Check if BasisIndices is a sparse matrix
        sparsity = sp.sparse.issparse(basis_indices)
        if sparsity:
            basis_indices = basis_indices.toarray()

        # Initialization and consistency checks
        # number of input variables
        n_params = univ_p_val.shape[1]

        # Size of the experimental design
        n_samples = univ_p_val.shape[0]

        # number of basis terms
        n_terms = basis_indices.shape[0]

        # check that the variables have consistent sizes
        if n_params != basis_indices.shape[1]:
            raise ValueError("The shapes of basis_indices and univ_p_val don't"
                             " match!!")

        # Preallocate the Psi matrix for performance
        psi = np.ones((n_samples, n_terms))
        # Assemble the Psi matrix
        for m in range(basis_indices.shape[1]):
            aa = np.where(basis_indices[:, m] > 0)[0]
            try:
                basisIdx = basis_indices[aa, m]
                bb = np.reshape(univ_p_val[:, m, basisIdx], psi[:, aa].shape)
                psi[:, aa] = np.multiply(psi[:, aa], bb)
            except ValueError as err:
                raise err

        return psi

    # -------------------------------------------------------------------------
    def fit(self, X, y, basis_indices, reg_method=None):
        """
        Fit regression using the regression method provided.

        Parameters
        ----------
        X : array of shape (n_samples, n_features)
            Training vector, where n_samples is the number of samples and
            n_features is the number of features.
        y : array of shape (n_samples,)
            Target values.
        basis_indices : array of shape (n_terms, n_params)
            Multi-indices of multivariate polynomials.
        reg_method : str, optional
            DESCRIPTION. The default is None.

        Returns
        -------
        return_out_dict : Dict
            Fitted estimator, spareMulti-Index, sparseX and coefficients.

        """
        if reg_method is None:
            reg_method = self.pce_reg_method

        # Check if BasisIndices is a sparse matrix
        sparsity = sp.sparse.issparse(basis_indices)

        clf_poly = []
        compute_score = True if self.verbose else False

        #  inverse of the observed variance of the data
        if np.var(y) != 0:
            Lambda = 1 / np.var(y)
        else:
            Lambda = 1e-6

        # Bayes sparse adaptive aPCE
        if reg_method.lower() != 'ols':
            if reg_method.lower() == 'brr' or np.var(y) == 0:
                clf_poly = lm.BayesianRidge(n_iter=1000, tol=1e-7,
                                            fit_intercept=True,
                                            normalize=True,
                                            compute_score=compute_score,
                                            alpha_1=1e-04, alpha_2=1e-04,
                                            lambda_1=Lambda, lambda_2=Lambda)
                clf_poly.converged = True

            elif reg_method.lower() == 'ard':
                clf_poly = lm.ARDRegression(fit_intercept=True,
                                            normalize=True,
                                            compute_score=compute_score,
                                            n_iter=1000, tol=0.0001,
                                            alpha_1=1e-3, alpha_2=1e-3,
                                            lambda_1=Lambda, lambda_2=Lambda)

            elif reg_method.lower() == 'fastard':
                clf_poly = RegressionFastARD(fit_intercept=True,
                                             normalize=True,
                                             compute_score=compute_score,
                                             n_iter=300, tol=1e-10)

            elif reg_method.lower() == 'bcs':
                clf_poly = RegressionFastLaplace(fit_intercept=False,
                                                 n_iter=1000, tol=1e-7)

            elif reg_method.lower() == 'lars':
                clf_poly = lm.LassoLarsCV(fit_intercept=False)

            elif reg_method.lower() == 'sgdr':
                clf_poly = lm.SGDRegressor(fit_intercept=False,
                                           max_iter=5000, tol=1e-7)

            elif reg_method.lower() == 'omp':
                clf_poly = lm.OrthogonalMatchingPursuitCV(fit_intercept=False,
                                                          max_iter=10)

            elif reg_method.lower() == 'vbl':
                clf_poly = VBLinearRegression(fit_intercept=False)

            elif reg_method.lower() == 'ebl':
                clf_poly = EBLinearRegression(optimizer='em')

            # Fit
            clf_poly.fit(X, y)

            # Select the nonzero entries of coefficients
            # The first column must be kept (For mean calculations)
            nnz_idx = np.nonzero(clf_poly.coef_)[0]

            if len(nnz_idx) == 0 or nnz_idx[0] != 0:
                nnz_idx = np.insert(np.nonzero(clf_poly.coef_)[0], 0, 0)
                # Remove the zero entries for Bases and PSI if need be
                if sparsity:
                    sparse_basis_indices = basis_indices.toarray()[nnz_idx]
                else:
                    sparse_basis_indices = basis_indices[nnz_idx]
                sparse_X = X[:, nnz_idx]

                # Store the coefficients of the regression model
                clf_poly.fit(sparse_X, y)
                coeffs = clf_poly.coef_
            else:
                # This is for the case where all outputs are zero, thereby
                # all coefficients are zero
                if sparsity:
                    sparse_basis_indices = basis_indices.toarray()
                else:
                    sparse_basis_indices = basis_indices
                sparse_X = X
                coeffs = clf_poly.coef_

        # Ordinary least square method (OLS)
        else:
            if sparsity:
                sparse_basis_indices = basis_indices.toarray()
            else:
                sparse_basis_indices = basis_indices
            sparse_X = X

            X_T_X = np.dot(sparse_X.T, sparse_X)

            if np.linalg.cond(X_T_X) > 1e-12 and \
               np.linalg.cond(X_T_X) < 1 / sys.float_info.epsilon:
                # faster
                coeffs = sp.linalg.solve(X_T_X, np.dot(sparse_X.T, y))
            else:
                # stabler
                coeffs = np.dot(np.dot(np.linalg.pinv(X_T_X), sparse_X.T), y)

        # Create a dict to pass the outputs
        return_out_dict = dict()
        return_out_dict['clf_poly'] = clf_poly
        return_out_dict['spareMulti-Index'] = sparse_basis_indices
        return_out_dict['sparePsi'] = sparse_X
        return_out_dict['coeffs'] = coeffs
        return return_out_dict

    # --------------------------------------------------------------------------------------------------------
    def adaptive_regression(self, ED_X, ED_Y, varIdx, verbose=False):
        """
        Adaptively fits the PCE model by comparing the scores of different
        degrees and q-norm.

        Parameters
        ----------
        ED_X : array of shape (n_samples, n_params)
            Experimental design.
        ED_Y : array of shape (n_samples,)
            Target values, i.e. simulation results for the Experimental design.
        varIdx : int
            Index of the output.
        verbose : bool, optional
            Print out summary. The default is False.

        Returns
        -------
        returnVars : Dict
            Fitted estimator, best degree, best q-norm, LOOCVScore and
            coefficients.

        """

        NrSamples, n_params = ED_X.shape
        # Initialization
        qAllCoeffs, AllCoeffs = {}, {}
        qAllIndices_Sparse, AllIndices_Sparse = {}, {}
        qAllclf_poly, Allclf_poly = {}, {}
        qAllnTerms, AllnTerms = {}, {}
        qAllLCerror, AllLCerror = {}, {}

        # Extract degree array and qnorm array
        DegreeArray = np.array([*self.allBasisIndices], dtype=int)
        qnorm = [*self.allBasisIndices[str(int(DegreeArray[0]))]]

        # Some options for EarlyStop
        errorIncreases = False
        # Stop degree, if LOO error does not decrease n_checks_degree times
        n_checks_degree = 3
        # Stop qNorm, if criterion isn't fulfilled n_checks_qNorm times
        n_checks_qNorm = 2
        nqnorms = len(qnorm)
        qNormEarlyStop = True
        if nqnorms < n_checks_qNorm+1:
            qNormEarlyStop = False

        # =====================================================================
        # basis adaptive polynomial chaos: repeat the calculation by increasing
        # polynomial degree until the highest accuracy is reached
        # =====================================================================
        # For each degree check all q-norms and choose the best one
        scores = -np.inf * np.ones(DegreeArray.shape[0])
        qNormScores = -np.inf * np.ones(nqnorms)

        for degIdx, deg in enumerate(DegreeArray):

            for qidx, q in enumerate(qnorm):

                # Extract the polynomial basis indices from the pool of
                # allBasisIndices
                BasisIndices = self.allBasisIndices[str(deg)][str(q)]

                # Assemble the Psi matrix
                Psi = self.create_psi(BasisIndices, self.univ_p_val)

                # Calulate the cofficients of the meta model
                outs = self.fit(Psi, ED_Y, BasisIndices)

                # Calculate and save the score of LOOCV
                score, LCerror = self.corr_loocv_error(outs['clf_poly'],
                                                       outs['sparePsi'],
                                                       outs['coeffs'],
                                                       ED_Y)

                # Check the convergence of noise for FastARD
                if self.pce_reg_method == 'FastARD' and \
                   outs['clf_poly'].alpha_ < np.finfo(np.float32).eps:
                    score = -np.inf

                qNormScores[qidx] = score
                qAllCoeffs[str(qidx+1)] = outs['coeffs']
                qAllIndices_Sparse[str(qidx+1)] = outs['spareMulti-Index']
                qAllclf_poly[str(qidx+1)] = outs['clf_poly']
                qAllnTerms[str(qidx+1)] = BasisIndices.shape[0]
                qAllLCerror[str(qidx+1)] = LCerror

                # EarlyStop check
                # if there are at least n_checks_qNorm entries after the
                # best one, we stop
                if qNormEarlyStop and \
                   sum(np.isfinite(qNormScores)) > n_checks_qNorm:
                    # If the error has increased the last two iterations, stop!
                    qNormScores_nonInf = qNormScores[np.isfinite(qNormScores)]
                    deltas = np.sign(np.diff(qNormScores_nonInf))
                    if sum(deltas[-n_checks_qNorm+1:]) == 2:
                        # stop the q-norm loop here
                        break
                if np.var(ED_Y) == 0:
                    break

            # Store the score in the scores list
            best_q = np.nanargmax(qNormScores)
            scores[degIdx] = qNormScores[best_q]

            AllCoeffs[str(degIdx+1)] = qAllCoeffs[str(best_q+1)]
            AllIndices_Sparse[str(degIdx+1)] = qAllIndices_Sparse[str(best_q+1)]
            Allclf_poly[str(degIdx+1)] = qAllclf_poly[str(best_q+1)]
            AllnTerms[str(degIdx+1)] = qAllnTerms[str(best_q+1)]
            AllLCerror[str(degIdx+1)] = qAllLCerror[str(best_q+1)]

            # Check the direction of the error (on average):
            # if it increases consistently stop the iterations
            if len(scores[scores != -np.inf]) > n_checks_degree:
                scores_nonInf = scores[scores != -np.inf]
                ss = np.sign(scores_nonInf - np.max(scores_nonInf))
                # ss<0 error decreasing
                errorIncreases = np.sum(np.sum(ss[-2:])) <= -1*n_checks_degree

            if errorIncreases:
                break

            # Check only one degree, if target matrix has zero variance
            if np.var(ED_Y) == 0:
                break

        # ------------------ Summary of results ------------------
        # Select the one with the best score and save the necessary outputs
        best_deg = np.nanargmax(scores)+1
        coeffs = AllCoeffs[str(best_deg)]
        basis_indices = AllIndices_Sparse[str(best_deg)]
        clf_poly = Allclf_poly[str(best_deg)]
        LOOCVScore = np.nanmax(scores)
        P = AllnTerms[str(best_deg)]
        LCerror = AllLCerror[str(best_deg)]
        degree = DegreeArray[np.nanargmax(scores)]
        qnorm = float(qnorm[best_q])

        # ------------------ Print out Summary of results ------------------
        if self.verbose:
            # Create PSI_Sparse by removing redundent terms
            nnz_idx = np.nonzero(coeffs)[0]
            BasisIndices_Sparse = basis_indices[nnz_idx]

            print(f'Output variable {varIdx+1}:')
            print('The estimation of PCE coefficients converged at polynomial '
                  f'degree {DegreeArray[best_deg-1]} with '
                  f'{len(BasisIndices_Sparse)} terms (Sparsity index = '
                  f'{round(len(BasisIndices_Sparse)/P, 3)}).')

            print(f'Final ModLOO error estimate: {1-max(scores):.3e}')
            print('\n'+'-'*50)

        if verbose:
            print('='*50)
            print(' '*10 + ' Summary of results ')
            print('='*50)

            print("scores:\n", scores)
            print("Best score's degree:", self.DegreeArray[best_deg-1])
            print("NO. of terms:", len(basis_indices))
            print("Sparsity index:", round(len(basis_indices)/P, 3))
            print("Best Indices:\n", basis_indices)

            if self.pce_reg_method in ['BRR', 'ARD']:
                fig, ax = plt.subplots(figsize=(12, 10))
                plt.title("Marginal log-likelihood")
                plt.plot(clf_poly.scores_, color='navy', linewidth=2)
                plt.ylabel("Score")
                plt.xlabel("Iterations")
                if self.pce_reg_method.lower() == 'bbr':
                    text = f"$\\alpha={clf_poly.alpha_:.1f}$\n"
                    f"$\\lambda={clf_poly.lambda_:.3f}$\n"
                    f"$L={clf_poly.scores_[-1]:.1f}$"
                else:
                    text = f"$\\alpha={clf_poly.alpha_:.1f}$\n$"
                    f"\\L={clf_poly.scores_[-1]:.1f}$"

                plt.text(0.75, 0.5, text, fontsize=18, transform=ax.transAxes)
                plt.show()
            print('='*80)

        # Create a dict to pass the outputs
        returnVars = dict()
        returnVars['clf_poly'] = clf_poly
        returnVars['degree'] = degree
        returnVars['qnorm'] = qnorm
        returnVars['coeffs'] = coeffs
        returnVars['multi_indices'] = basis_indices
        returnVars['LOOCVScore'] = LOOCVScore
        returnVars['LCerror'] = LCerror

        return returnVars

    # -------------------------------------------------------------------------
    def corr_loocv_error(self, clf, psi, coeffs, y):
        """
        Calculates the corrected LOO error for regression on regressor
        matrix `psi` that generated the coefficients based on [1] and [2].

        [1] Blatman, G., 2009. Adaptive sparse polynomial chaos expansions for
            uncertainty propagation and sensitivity analysis (Doctoral
            dissertation, Clermont-Ferrand 2).

        [2] Blatman, G. and Sudret, B., 2011. Adaptive sparse polynomial chaos
            expansion based on least angle regression. Journal of computational
            Physics, 230(6), pp.2345-2367.

        Parameters
        ----------
        clf : object
            Fitted estimator.
        psi : array of shape (n_samples, n_features)
            The multivariate orthogonal polynomials (regressor).
        coeffs : array-like of shape (n_features,)
            Estimated cofficients.
        y : array of shape (n_samples,)
            Target values.

        Returns
        -------
        Q_2 : float
            LOOCV Validation score (1-LOOCV erro).
        residual : array of shape (n_samples,)
            Residual values (y - predicted targets).

        """
        psi = np.array(psi, dtype=float)

        # Create PSI_Sparse by removing redundent terms
        nnz_idx = np.nonzero(coeffs)[0]
        if len(nnz_idx) == 0:
            nnz_idx = [0]
        psi_sparse = psi[:, nnz_idx]

        # NrCoeffs of aPCEs
        P = len(nnz_idx)
        # NrEvaluation (Size of experimental design)
        N = psi.shape[0]

        # Build the projection matrix
        PsiTPsi = np.dot(psi_sparse.T, psi_sparse)

        if np.linalg.cond(PsiTPsi) > 1e-12 and \
           np.linalg.cond(PsiTPsi) < 1/sys.float_info.epsilon:
            # faster
            M = sp.linalg.solve(PsiTPsi,
                                sp.sparse.eye(PsiTPsi.shape[0]).toarray())
        else:
            # stabler
            M = np.linalg.pinv(PsiTPsi)

        # h factor (the full matrix is not calculated explicitly,
        # only the trace is, to save memory)
        PsiM = np.dot(psi_sparse, M)

        h = np.sum(np.multiply(PsiM, psi_sparse), axis=1, dtype=np.float128)

        # ------ Calculate Error Loocv for each measurement point ----
        # Residuals
        if isinstance(clf, list):
            residual = np.dot(psi, coeffs) - y
        else:
            residual = clf.predict(psi) - y

        # Variance
        varY = np.var(y)

        if varY == 0:
            normEmpErr = 0
            ErrLoo = 0
            LCerror = np.zeros((y.shape))
        else:
            normEmpErr = np.mean(residual**2)/varY

            # LCerror = np.divide(residual, (1-h))
            LCerror = residual / (1-h)[:, np.newaxis]
            ErrLoo = np.mean(np.square(LCerror)) / varY
            # if there are NaNs, just return an infinite LOO error (this
            # happens, e.g., when a strongly underdetermined problem is solved)
            if np.isnan(ErrLoo):
                ErrLoo = np.inf

        # Corrected Error for over-determined system
        trM = np.trace(M)
        if trM < 0 or abs(trM) > 1e6:
            trM = np.trace(np.linalg.pinv(np.dot(psi.T, psi)))

        # Over-determined system of Equation
        if N > P:
            T_factor = N/(N-P) * (1 + trM)

        # Under-determined system of Equation
        else:
            T_factor = np.inf

        CorrectedErrLoo = ErrLoo * T_factor

        Q_2 = 1 - CorrectedErrLoo

        return Q_2, residual

    # -------------------------------------------------------------------------
    def pca_transformation(self, Output):
        """
        Transforms the targets (outputs) via Principal Component Analysis

        Parameters
        ----------
        Output : array of shape (n_samples,)
            Target values.

        Returns
        -------
        pca : obj
            Fitted sklearnPCA object.
        OutputMatrix : array of shape (n_samples,)
            Transformed target values.

        """
        # Transform via Principal Component Analysis
        if hasattr(self, 'var_pca_threshold'):
            var_pca_threshold = self.var_pca_threshold
        else:
            var_pca_threshold = 100.0
        n_samples, n_features = Output.shape

        if hasattr(self, 'n_pca_components'):
            n_pca_components = self.n_pca_components
        else:
            # Instantiate and fit sklearnPCA object
            covar_matrix = sklearnPCA(n_components=None)
            covar_matrix.fit(Output)
            var = np.cumsum(np.round(covar_matrix.explained_variance_ratio_,
                                     decimals=5)*100)
            # Find the number of components to explain self.varPCAThreshold of
            # variance
            try:
                n_components = np.where(var >= var_pca_threshold)[0][0] + 1
            except IndexError:
                n_components = min(n_samples, n_features)

            n_pca_components = min(n_samples, n_features, n_components)

        # Print out a report
        print()
        print('-' * 50)
        print(f"PCA transformation is performed with {n_pca_components}"
              " components.")
        print('-' * 50)
        print()

        # Fit and transform with the selected number of components
        pca = sklearnPCA(n_components=n_pca_components,
                         svd_solver='randomized')
        OutputMatrix = pca.fit_transform(Output)

        return pca, OutputMatrix

    # -------------------------------------------------------------------------
    def gaussian_process_emulator(self, X, y, nug_term=None, autoSelect=False,
                                  varIdx=None):
        """
        Fits a Gaussian Process Emulator to the target given the training
         points.

        Parameters
        ----------
        X : array of shape (n_samples, n_params)
            Training points.
        y : array of shape (n_samples,)
            Target values.
        nug_term : float, optional
            Nugget term. The default is None, i.e. variance of y.
        autoSelect : bool, optional
            Loop over some kernels and select the best. The default is False.
        varIdx : int, optional
            The index number. The default is None.

        Returns
        -------
        gp : object
            Fitted estimator.

        """

        nug_term = nug_term if nug_term else np.var(y)

        Kernels = [nug_term * kernels.RBF(length_scale=1.0,
                                          length_scale_bounds=(1e-25, 1e15)),
                   nug_term * kernels.RationalQuadratic(length_scale=0.2,
                                                        alpha=1.0),
                   nug_term * kernels.Matern(length_scale=1.0,
                                             length_scale_bounds=(1e-15, 1e5),
                                             nu=1.5)]

        # Automatic selection of the kernel
        if autoSelect:
            gp = {}
            BME = []
            for i, kernel in enumerate(Kernels):
                gp[i] = GaussianProcessRegressor(kernel=kernel,
                                                 n_restarts_optimizer=3,
                                                 normalize_y=False)

                # Fit to data using Maximum Likelihood Estimation
                gp[i].fit(X, y)

                # Store the MLE as BME score
                BME.append(gp[i].log_marginal_likelihood())

            gp = gp[np.argmax(BME)]

        else:
            gp = GaussianProcessRegressor(kernel=Kernels[0],
                                          n_restarts_optimizer=3,
                                          normalize_y=False)
            gp.fit(X, y)

        # Compute score
        if varIdx is not None:
            Score = gp.score(X, y)
            print('-'*50)
            print(f'Output variable {varIdx}:')
            print('The estimation of GPE coefficients converged,')
            print(f'with the R^2 score: {Score:.3f}')
            print('-'*50)

        return gp

    # -------------------------------------------------------------------------
    def eval_metamodel(self, samples=None, nsamples=None,
                       sampling_method='random', return_samples=False):
        """
        Evaluates meta-model at the requested samples. One can also generate
        nsamples.

        Parameters
        ----------
        samples : array of shape (n_samples, n_params), optional
            Samples to evaluate meta-model at. The default is None.
        nsamples : int, optional
            Number of samples to generate, if no `samples` is provided. The
            default is None.
        sampling_method : str, optional
            Type of sampling, if no `samples` is provided. The default is
            'random'.
        return_samples : bool, optional
            Retun samples, if no `samples` is provided. The default is False.

        Returns
        -------
        mean_pred : dict
            Mean of the predictions.
        std_pred : dict
            Standard deviatioon of the predictions.
        """
        if self.meta_model_type.lower() == 'gpe':
            model_dict = self.gp_poly
        else:
            model_dict = self.coeffs_dict

        if samples is None:
            if nsamples is None:
                self.n_samples = 100000
            else:
                self.n_samples = nsamples

            samples = self.ExpDesign.generate_samples(self.n_samples,
                                                      sampling_method)
        else:
            self.samples = samples
            self.n_samples = len(samples)

        # Transform samples
        samples = self.ExpDesign.transform(samples)

        if self.meta_model_type.lower() != 'gpe':
            univ_p_val = self.univ_basis_vals(samples,
                                              n_max=np.max(self.pce_deg))

        mean_pred = {}
        std_pred = {}

        # Loop over outputs
        for ouput, values in model_dict.items():

            mean = np.zeros((len(samples), len(values)))
            std = np.zeros((len(samples), len(values)))
            idx = 0
            for in_key, InIdxValues in values.items():

                # Perdiction with GPE
                if self.meta_model_type.lower() == 'gpe':
                    X_T = self.x_scaler[ouput].transform(samples)
                    gp = self.gp_poly[ouput][in_key]
                    y_mean, y_std = gp.predict(X_T, return_std=True)

                else:
                    # Perdiction with PCE or pcekriging
                    # Assemble Psi matrix
                    psi = self.create_psi(self.basis_dict[ouput][in_key],
                                          univ_p_val)
                    # Perdiction
                    try:
                        # with error bar
                        clf_poly = self.clf_poly[ouput][in_key]
                        y_mean, y_std = clf_poly.predict(psi, return_std=True)

                    except:
                        # without error bar
                        coeffs = self.coeffs_dict[ouput][in_key]
                        y_mean = np.dot(psi, coeffs)
                        y_std = np.zeros_like(y_mean)

                mean[:, idx] = y_mean
                std[:, idx] = y_std
                idx += 1

            if self.dim_red_method.lower() == 'pca':
                PCA = self.pca[ouput]
                mean_pred[ouput] = PCA.mean_ + np.dot(mean, PCA.components_)
                std_pred[ouput] = np.sqrt(np.dot(std**2, PCA.components_**2))
            else:
                mean_pred[ouput] = mean
                std_pred[ouput] = std

        if return_samples:
            return mean_pred, std_pred, samples
        else:
            return mean_pred, std_pred

    # -------------------------------------------------------------------------
    def create_model_error(self, X, y, name='Calib'):
        """
        Fits a GPE-based model error.

        Parameters
        ----------
        X : array of shape (n_outputs, n_inputs)
            Input array. It can contain any forcing inputs or coordinates of
             extracted data.
        y : array of shape (n_outputs,)
            The model response for the MAP parameter set.
        name : str, optional
            Calibration or validation. The default is `'Calib'`.

        Returns
        -------
        self: object
            Self object.

        """
        Model = self.ModelObj
        outputNames = Model.Output.Names
        self.errorRegMethod = 'GPE'
        self.errorclf_poly = self.auto_vivification()
        self.errorScale = self.auto_vivification()

        # Read data
        MeasuredData = Model.read_observation(case=name)

        # Fitting GPR based bias model
        for out in outputNames:
            nan_idx = ~np.isnan(MeasuredData[out])
            # Select data
            try:
                data = MeasuredData[out].values[nan_idx]
            except AttributeError:
                data = MeasuredData[out][nan_idx]

            # Prepare the input matrix
            scaler = MinMaxScaler()
            delta = data  # - y[out][0]
            BiasInputs = np.hstack((X[out], y[out].reshape(-1, 1)))
            X_S = scaler.fit_transform(BiasInputs)
            gp = self.gaussian_process_emulator(X_S, delta)

            self.errorScale[out]["y_1"] = scaler
            self.errorclf_poly[out]["y_1"] = gp

        return self

    # -------------------------------------------------------------------------
    def eval_model_error(self, X, y_pred):
        """
        Evaluates the error model.

        Parameters
        ----------
        X : array
            Inputs.
        y_pred : dict
            Predictions.

        Returns
        -------
        mean_pred : dict
            Mean predition of the GPE-based error model.
        std_pred : dict
            standard deviation of the GPE-based error model.

        """
        mean_pred = {}
        std_pred = {}

        for Outkey, ValuesDict in self.errorclf_poly.items():

            pred_mean = np.zeros_like(y_pred[Outkey])
            pred_std = np.zeros_like(y_pred[Outkey])

            for Inkey, InIdxValues in ValuesDict.items():

                gp = self.errorclf_poly[Outkey][Inkey]
                scaler = self.errorScale[Outkey][Inkey]

                # Transform Samples using scaler
                for j, pred in enumerate(y_pred[Outkey]):
                    BiasInputs = np.hstack((X[Outkey], pred.reshape(-1, 1)))
                    Samples_S = scaler.transform(BiasInputs)
                    y_hat, y_std = gp.predict(Samples_S, return_std=True)
                    pred_mean[j] = y_hat
                    pred_std[j] = y_std
                    # pred_mean[j] += pred

            mean_pred[Outkey] = pred_mean
            std_pred[Outkey] = pred_std

        return mean_pred, std_pred

    # -------------------------------------------------------------------------
    class auto_vivification(dict):
        """
        Implementation of perl's AutoVivification feature.

        Source: https://stackoverflow.com/a/651879/18082457
        """

        def __getitem__(self, item):
            try:
                return dict.__getitem__(self, item)
            except KeyError:
                value = self[item] = type(self)()
                return value

    # -------------------------------------------------------------------------
    def __select_degree(self, ndim, nSamples):
        """
        Selects degree based on the number of samples and parameters in the
        sequential design.

        Parameters
        ----------
        ndim : TYPE
            DESCRIPTION.
        nSamples : TYPE
            DESCRIPTION.

        Returns
        -------
        TYPE
            DESCRIPTION.

        """
        # Define the DegreeArray
        max_deg = np.max(self.pce_deg)
        min_Deg = np.min(self.pce_deg)
        nitr = nSamples - self.ExpDesign.n_init_samples

        # Check q-norm
        if not np.isscalar(self.pce_q_norm):
            self.pce_q_norm = np.array(self.pce_q_norm)
        else:
            self.pce_q_norm = np.array([self.pce_q_norm])

        def M_uptoMax(maxDeg):
            n_combo = np.zeros(maxDeg)
            for i, d in enumerate(range(1, maxDeg+1)):
                n_combo[i] = math.factorial(ndim+d)
                n_combo[i] /= math.factorial(ndim) * math.factorial(d)
            return n_combo

        if self.ExpDesignFlag != 'sequential':
            degNew = max_deg
        else:
            d = nitr if nitr != 0 and self.n_params > 5 else 1
            min_index = np.argmin(abs(M_uptoMax(max_deg)-ndim*nSamples*d))
            degNew = range(1, max_deg+1)[min_index]

        if degNew > min_Deg and self.pce_reg_method.lower() != 'fastard':
            DegreeArray = np.arange(min_Deg, degNew+1)
        else:
            DegreeArray = np.array([degNew])

        return DegreeArray

Class variables

var auto_vivification

Implementation of perl's AutoVivification feature.

Source: https://stackoverflow.com/a/651879/18082457

Methods

def create_metamodel(self, Model)

Starts the training of the meta-model for the model objects containg the given computational model.

Parameters

Model : obj
Model object.

Returns

metamodel : obj
The meta model object.
Expand source code
def create_metamodel(self, Model):
    """
    Starts the training of the meta-model for the model objects containg
     the given computational model.

    Parameters
    ----------
    Model : obj
        Model object.

    Returns
    -------
    metamodel : obj
        The meta model object.

    """
    self.ModelObj = Model
    self.n_params = len(self.input_obj.Marginals)
    self.ExpDesignFlag = 'normal'
    # --- Prepare pce degree ---
    if self.meta_model_type.lower() == 'pce':
        if type(self.pce_deg) is not np.ndarray:
            self.pce_deg = np.array(self.pce_deg)

    if self.ExpDesign.method == 'sequential':
        from .sequential_design import SeqDesign
        seq_design = SeqDesign(self)
        metamodel = seq_design.train_seq_design(Model)

    elif self.ExpDesign.method == 'normal':
        self.ExpDesignFlag = 'normal'
        metamodel = self.train_norm_design(Model)

    else:
        raise Exception("The method for experimental design you requested"
                        " has not been implemented yet.")

    # Zip the model run directories
    if self.ModelObj.link_type.lower() == 'pylink':
        Model.zip_subdirs(Model.name, f'{Model.name}_')

    return metamodel
def train_norm_design(self, Model, verbose=False)

This function loops over the outputs and each time step/point and fits the meta model.

Parameters

Model : obj
Model object.
verbose : bool, optional
Flag for a sequential design in silent mode. The default is False.

Returns

self : obj
Meta-model object.
Expand source code
def train_norm_design(self, Model, verbose=False):
    """
    This function loops over the outputs and each time step/point and fits
    the meta model.

    Parameters
    ----------
    Model : obj
        Model object.
    verbose : bool, optional
        Flag for a sequential design in silent mode. The default is False.

    Returns
    -------
    self: obj
        Meta-model object.

    """

    # Get the collocation points to run the forward model
    CollocationPoints, OutputDict = self.generate_ExpDesign(Model)

    # Initialize the nested dictionaries
    self.deg_dict = self.auto_vivification()
    self.q_norm_dict = self.auto_vivification()
    self.coeffs_dict = self.auto_vivification()
    self.basis_dict = self.auto_vivification()
    self.score_dict = self.auto_vivification()
    self.clf_poly = self.auto_vivification()
    self.gp_poly = self.auto_vivification()
    self.pca = self.auto_vivification()
    self.LCerror = self.auto_vivification()
    self.x_scaler = {}

    # Define the DegreeArray
    nSamples, ndim = CollocationPoints.shape
    self.DegreeArray = self.__select_degree(ndim, nSamples)

    # Generate all basis indices
    self.allBasisIndices = self.auto_vivification()
    for deg in self.DegreeArray:
        keys = self.allBasisIndices.keys()
        if deg not in np.fromiter(keys, dtype=float):
            # Generate the polynomial basis indices
            for qidx, q in enumerate(self.pce_q_norm):
                basis_indices = self.create_basis_indices(degree=deg,
                                                          q_norm=q)
                self.allBasisIndices[str(deg)][str(q)] = basis_indices

    # Evaluate the univariate polynomials on ExpDesign
    if self.meta_model_type.lower() != 'gpe':
        self.univ_p_val = self.univ_basis_vals(CollocationPoints)

    if 'x_values' in OutputDict:
        self.ExpDesign.x_values = OutputDict['x_values']
        del OutputDict['x_values']

    # --- Loop through data points and fit the surrogate ---
    if not verbose:
        print(f"\n>>>> Training the {self.meta_model_type} metamodel "
              "started. <<<<<<\n")
        items = tqdm(OutputDict.items(), desc="Fitting regression")
    else:
        items = OutputDict.items()

    # For loop over the components/outputs
    for key, Output in items:

        # Dimensionality reduction with PCA, if specified
        if self.dim_red_method.lower() == 'pca':
            self.pca[key], target = self.pca_transformation(Output)
        else:
            target = Output

        # Parallel fit regression
        if self.meta_model_type.lower() == 'gpe':
            # Prepare the input matrix
            scaler = MinMaxScaler()
            X_S = scaler.fit_transform(CollocationPoints)

            self.x_scaler[key] = scaler

            out = Parallel(n_jobs=-1, prefer='threads')(
                delayed(self.gaussian_process_emulator)(X_S, target[:, idx])
                for idx in range(target.shape[1]))

            for idx in range(target.shape[1]):
                self.gp_poly[key][f"y_{idx+1}"] = out[idx]

        else:
            out = Parallel(n_jobs=-1, prefer='threads')(
                delayed(self.adaptive_regression)(CollocationPoints,
                                                  target[:, idx], idx)
                for idx in range(target.shape[1]))

            for i in range(target.shape[1]):
                # Create a dict to pass the variables
                self.deg_dict[key][f"y_{i+1}"] = out[i]['degree']
                self.q_norm_dict[key][f"y_{i+1}"] = out[i]['qnorm']
                self.coeffs_dict[key][f"y_{i+1}"] = out[i]['coeffs']
                self.basis_dict[key][f"y_{i+1}"] = out[i]['multi_indices']
                self.score_dict[key][f"y_{i+1}"] = out[i]['LOOCVScore']
                self.clf_poly[key][f"y_{i+1}"] = out[i]['clf_poly']
                self.LCerror[key][f"y_{i+1}"] = out[i]['LCerror']

    if not verbose:
        print(f"\n>>>> Training the {self.meta_model_type} metamodel"
              " sucessfully completed. <<<<<<\n")

    return self
def create_basis_indices(self, degree, q_norm)

Creates set of selected multi-indices of multivariate polynomials for certain parameter numbers, polynomial degree, hyperbolic (or q-norm) truncation scheme.

Parameters

degree : int
Polynomial degree.
q_norm : float
hyperbolic (or q-norm) truncation.

Returns

basis_indices : array of shape (n_terms, n_params)
Multi-indices of multivariate polynomials.
Expand source code
def create_basis_indices(self, degree, q_norm):
    """
    Creates set of selected multi-indices of multivariate polynomials for
    certain parameter numbers, polynomial degree, hyperbolic (or q-norm)
    truncation scheme.

    Parameters
    ----------
    degree : int
        Polynomial degree.
    q_norm : float
        hyperbolic (or q-norm) truncation.

    Returns
    -------
    basis_indices : array of shape (n_terms, n_params)
        Multi-indices of multivariate polynomials.

    """
    basis_indices = glexindex(start=0, stop=degree+1,
                              dimensions=self.n_params,
                              cross_truncation=q_norm,
                              reverse=False, graded=True)
    return basis_indices
def add_ExpDesign(self)

Instanciates experimental design object.

Returns

None.

Expand source code
def add_ExpDesign(self):
    """
    Instanciates experimental design object.

    Returns
    -------
    None.

    """
    self.ExpDesign = ExpDesigns(self.input_obj,
                                meta_Model=self.meta_model_type)
def generate_ExpDesign(self, Model)

Prepares the experimental design either by reading from the prescribed data or running simulations.

Parameters

Model : obj
Model object.

Raises

Exception
If model sumulations are not provided properly.

Returns

ED_X_tr : array of shape (n_samples, n_params)
Training samples transformed by an isoprobabilistic transformation.
ED_Y : dict
Model simulations (target) for all outputs.
Expand source code
def generate_ExpDesign(self, Model):
    """
    Prepares the experimental design either by reading from the prescribed
    data or running simulations.

    Parameters
    ----------
    Model : obj
        Model object.

    Raises
    ------
    Exception
        If model sumulations are not provided properly.

    Returns
    -------
    ED_X_tr: array of shape (n_samples, n_params)
        Training samples transformed by an isoprobabilistic transformation.
    ED_Y: dict
        Model simulations (target) for all outputs.
    """
    ExpDesign = self.ExpDesign
    if self.ExpDesignFlag != 'sequential':
        # Read ExpDesign (training and targets) from the provided hdf5
        if ExpDesign.hdf5_file is not None:

            # Read hdf5 file
            f = h5py.File(ExpDesign.hdf5_file, 'r+')

            # Read EDX and pass it to ExpDesign object
            try:
                ExpDesign.X = np.array(f["EDX/New_init_"])
            except KeyError:
                ExpDesign.X = np.array(f["EDX/init_"])

            # Update number of initial samples
            ExpDesign.n_init_samples = ExpDesign.X.shape[0]

            # Read EDX and pass it to ExpDesign object
            out_names = self.ModelObj.Output.names
            ExpDesign.Y = {}

            # Extract x values
            try:
                ExpDesign.Y["x_values"] = dict()
                for varIdx, var in enumerate(out_names):
                    x = np.array(f[f"x_values/{var}"])
                    ExpDesign.Y["x_values"][var] = x
            except KeyError:
                ExpDesign.Y["x_values"] = np.array(f["x_values"])

            # Store the output
            for varIdx, var in enumerate(out_names):
                try:
                    y = np.array(f[f"EDY/{var}/New_init_"])
                except KeyError:
                    y = np.array(f[f"EDY/{var}/init_"])
                ExpDesign.Y[var] = y
            f.close()
        else:
            # Check if an old hdf5 file exists: if yes, rename it
            hdf5file = f'ExpDesign_{self.ModelObj.name}.hdf5'
            if os.path.exists(hdf5file):
                os.rename(hdf5file, 'old_'+hdf5file)

    # ---- Prepare X samples ----
    ED_X, ED_X_tr = ExpDesign.generate_ED(ExpDesign.n_init_samples,
                                          ExpDesign.sampling_method,
                                          transform=True,
                                          max_pce_deg=np.max(self.pce_deg))
    ExpDesign.X = ED_X
    ExpDesign.collocationPoints = ED_X_tr
    self.bound_tuples = ExpDesign.bound_tuples

    # ---- Run simulations at X ----
    if not hasattr(ExpDesign, 'Y') or ExpDesign.Y is None:
        print('\n Now the forward model needs to be run!\n')
        ED_Y, up_ED_X = Model.run_model_parallel(ED_X)
        ExpDesign.X = up_ED_X
        self.ModelOutputDict = ED_Y
        ExpDesign.Y = ED_Y
    else:
        # Check if a dict has been passed.
        if type(ExpDesign.Y) is dict:
            self.ModelOutputDict = ExpDesign.Y
        else:
            raise Exception('Please provide either a dictionary or a hdf5'
                            'file to ExpDesign.hdf5_file argument.')

    return ED_X_tr, self.ModelOutputDict
def univ_basis_vals(self, samples, n_max=None)

Evaluates univariate regressors along input directions.

Parameters

samples : array of shape (n_samples, n_params)
Samples.
n_max : int, optional
Maximum polynomial degree. The default is None.

Returns

univ_basis : array of shape (n_samples, n_params, n_max+1)
All univariate regressors up to n_max.
Expand source code
def univ_basis_vals(self, samples, n_max=None):
    """
    Evaluates univariate regressors along input directions.

    Parameters
    ----------
    samples : array of shape (n_samples, n_params)
        Samples.
    n_max : int, optional
        Maximum polynomial degree. The default is `None`.

    Returns
    -------
    univ_basis: array of shape (n_samples, n_params, n_max+1)
        All univariate regressors up to n_max.
    """
    # Extract information
    poly_types = self.ExpDesign.poly_types
    if samples.ndim != 2:
        samples = samples.reshape(1, len(samples))
    n_max = np.max(self.pce_deg) if n_max is None else n_max

    # Extract poly coeffs
    if self.ExpDesign.input_data_given or self.ExpDesign.apce:
        apolycoeffs = self.ExpDesign.polycoeffs
    else:
        apolycoeffs = None

    # Evaluate univariate basis
    univ_basis = eval_univ_basis(samples, n_max, poly_types, apolycoeffs)

    return univ_basis
def create_psi(self, basis_indices, univ_p_val)

This function assemble the design matrix Psi from the given basis index set INDICES and the univariate polynomial evaluations univ_p_val.

Parameters

basis_indices : array of shape (n_terms, n_params)
Multi-indices of multivariate polynomials.
univ_p_val : array of (n_samples, n_params, n_max+1)
All univariate regressors up to n_max.

Raises

ValueError
n_terms in arguments do not match.

Returns

psi : array of shape (n_samples, n_terms)
Multivariate regressors.
Expand source code
def create_psi(self, basis_indices, univ_p_val):
    """
    This function assemble the design matrix Psi from the given basis index
    set INDICES and the univariate polynomial evaluations univ_p_val.

    Parameters
    ----------
    basis_indices : array of shape (n_terms, n_params)
        Multi-indices of multivariate polynomials.
    univ_p_val : array of (n_samples, n_params, n_max+1)
        All univariate regressors up to `n_max`.

    Raises
    ------
    ValueError
        n_terms in arguments do not match.

    Returns
    -------
    psi : array of shape (n_samples, n_terms)
        Multivariate regressors.

    """
    # Check if BasisIndices is a sparse matrix
    sparsity = sp.sparse.issparse(basis_indices)
    if sparsity:
        basis_indices = basis_indices.toarray()

    # Initialization and consistency checks
    # number of input variables
    n_params = univ_p_val.shape[1]

    # Size of the experimental design
    n_samples = univ_p_val.shape[0]

    # number of basis terms
    n_terms = basis_indices.shape[0]

    # check that the variables have consistent sizes
    if n_params != basis_indices.shape[1]:
        raise ValueError("The shapes of basis_indices and univ_p_val don't"
                         " match!!")

    # Preallocate the Psi matrix for performance
    psi = np.ones((n_samples, n_terms))
    # Assemble the Psi matrix
    for m in range(basis_indices.shape[1]):
        aa = np.where(basis_indices[:, m] > 0)[0]
        try:
            basisIdx = basis_indices[aa, m]
            bb = np.reshape(univ_p_val[:, m, basisIdx], psi[:, aa].shape)
            psi[:, aa] = np.multiply(psi[:, aa], bb)
        except ValueError as err:
            raise err

    return psi
def fit(self, X, y, basis_indices, reg_method=None)

Fit regression using the regression method provided.

Parameters

X : array of shape (n_samples, n_features)
Training vector, where n_samples is the number of samples and n_features is the number of features.
y : array of shape (n_samples,)
Target values.
basis_indices : array of shape (n_terms, n_params)
Multi-indices of multivariate polynomials.
reg_method : str, optional
DESCRIPTION. The default is None.

Returns

return_out_dict : Dict
Fitted estimator, spareMulti-Index, sparseX and coefficients.
Expand source code
def fit(self, X, y, basis_indices, reg_method=None):
    """
    Fit regression using the regression method provided.

    Parameters
    ----------
    X : array of shape (n_samples, n_features)
        Training vector, where n_samples is the number of samples and
        n_features is the number of features.
    y : array of shape (n_samples,)
        Target values.
    basis_indices : array of shape (n_terms, n_params)
        Multi-indices of multivariate polynomials.
    reg_method : str, optional
        DESCRIPTION. The default is None.

    Returns
    -------
    return_out_dict : Dict
        Fitted estimator, spareMulti-Index, sparseX and coefficients.

    """
    if reg_method is None:
        reg_method = self.pce_reg_method

    # Check if BasisIndices is a sparse matrix
    sparsity = sp.sparse.issparse(basis_indices)

    clf_poly = []
    compute_score = True if self.verbose else False

    #  inverse of the observed variance of the data
    if np.var(y) != 0:
        Lambda = 1 / np.var(y)
    else:
        Lambda = 1e-6

    # Bayes sparse adaptive aPCE
    if reg_method.lower() != 'ols':
        if reg_method.lower() == 'brr' or np.var(y) == 0:
            clf_poly = lm.BayesianRidge(n_iter=1000, tol=1e-7,
                                        fit_intercept=True,
                                        normalize=True,
                                        compute_score=compute_score,
                                        alpha_1=1e-04, alpha_2=1e-04,
                                        lambda_1=Lambda, lambda_2=Lambda)
            clf_poly.converged = True

        elif reg_method.lower() == 'ard':
            clf_poly = lm.ARDRegression(fit_intercept=True,
                                        normalize=True,
                                        compute_score=compute_score,
                                        n_iter=1000, tol=0.0001,
                                        alpha_1=1e-3, alpha_2=1e-3,
                                        lambda_1=Lambda, lambda_2=Lambda)

        elif reg_method.lower() == 'fastard':
            clf_poly = RegressionFastARD(fit_intercept=True,
                                         normalize=True,
                                         compute_score=compute_score,
                                         n_iter=300, tol=1e-10)

        elif reg_method.lower() == 'bcs':
            clf_poly = RegressionFastLaplace(fit_intercept=False,
                                             n_iter=1000, tol=1e-7)

        elif reg_method.lower() == 'lars':
            clf_poly = lm.LassoLarsCV(fit_intercept=False)

        elif reg_method.lower() == 'sgdr':
            clf_poly = lm.SGDRegressor(fit_intercept=False,
                                       max_iter=5000, tol=1e-7)

        elif reg_method.lower() == 'omp':
            clf_poly = lm.OrthogonalMatchingPursuitCV(fit_intercept=False,
                                                      max_iter=10)

        elif reg_method.lower() == 'vbl':
            clf_poly = VBLinearRegression(fit_intercept=False)

        elif reg_method.lower() == 'ebl':
            clf_poly = EBLinearRegression(optimizer='em')

        # Fit
        clf_poly.fit(X, y)

        # Select the nonzero entries of coefficients
        # The first column must be kept (For mean calculations)
        nnz_idx = np.nonzero(clf_poly.coef_)[0]

        if len(nnz_idx) == 0 or nnz_idx[0] != 0:
            nnz_idx = np.insert(np.nonzero(clf_poly.coef_)[0], 0, 0)
            # Remove the zero entries for Bases and PSI if need be
            if sparsity:
                sparse_basis_indices = basis_indices.toarray()[nnz_idx]
            else:
                sparse_basis_indices = basis_indices[nnz_idx]
            sparse_X = X[:, nnz_idx]

            # Store the coefficients of the regression model
            clf_poly.fit(sparse_X, y)
            coeffs = clf_poly.coef_
        else:
            # This is for the case where all outputs are zero, thereby
            # all coefficients are zero
            if sparsity:
                sparse_basis_indices = basis_indices.toarray()
            else:
                sparse_basis_indices = basis_indices
            sparse_X = X
            coeffs = clf_poly.coef_

    # Ordinary least square method (OLS)
    else:
        if sparsity:
            sparse_basis_indices = basis_indices.toarray()
        else:
            sparse_basis_indices = basis_indices
        sparse_X = X

        X_T_X = np.dot(sparse_X.T, sparse_X)

        if np.linalg.cond(X_T_X) > 1e-12 and \
           np.linalg.cond(X_T_X) < 1 / sys.float_info.epsilon:
            # faster
            coeffs = sp.linalg.solve(X_T_X, np.dot(sparse_X.T, y))
        else:
            # stabler
            coeffs = np.dot(np.dot(np.linalg.pinv(X_T_X), sparse_X.T), y)

    # Create a dict to pass the outputs
    return_out_dict = dict()
    return_out_dict['clf_poly'] = clf_poly
    return_out_dict['spareMulti-Index'] = sparse_basis_indices
    return_out_dict['sparePsi'] = sparse_X
    return_out_dict['coeffs'] = coeffs
    return return_out_dict
def adaptive_regression(self, ED_X, ED_Y, varIdx, verbose=False)

Adaptively fits the PCE model by comparing the scores of different degrees and q-norm.

Parameters

ED_X : array of shape (n_samples, n_params)
Experimental design.
ED_Y : array of shape (n_samples,)
Target values, i.e. simulation results for the Experimental design.
varIdx : int
Index of the output.
verbose : bool, optional
Print out summary. The default is False.

Returns

returnVars : Dict
Fitted estimator, best degree, best q-norm, LOOCVScore and coefficients.
Expand source code
def adaptive_regression(self, ED_X, ED_Y, varIdx, verbose=False):
    """
    Adaptively fits the PCE model by comparing the scores of different
    degrees and q-norm.

    Parameters
    ----------
    ED_X : array of shape (n_samples, n_params)
        Experimental design.
    ED_Y : array of shape (n_samples,)
        Target values, i.e. simulation results for the Experimental design.
    varIdx : int
        Index of the output.
    verbose : bool, optional
        Print out summary. The default is False.

    Returns
    -------
    returnVars : Dict
        Fitted estimator, best degree, best q-norm, LOOCVScore and
        coefficients.

    """

    NrSamples, n_params = ED_X.shape
    # Initialization
    qAllCoeffs, AllCoeffs = {}, {}
    qAllIndices_Sparse, AllIndices_Sparse = {}, {}
    qAllclf_poly, Allclf_poly = {}, {}
    qAllnTerms, AllnTerms = {}, {}
    qAllLCerror, AllLCerror = {}, {}

    # Extract degree array and qnorm array
    DegreeArray = np.array([*self.allBasisIndices], dtype=int)
    qnorm = [*self.allBasisIndices[str(int(DegreeArray[0]))]]

    # Some options for EarlyStop
    errorIncreases = False
    # Stop degree, if LOO error does not decrease n_checks_degree times
    n_checks_degree = 3
    # Stop qNorm, if criterion isn't fulfilled n_checks_qNorm times
    n_checks_qNorm = 2
    nqnorms = len(qnorm)
    qNormEarlyStop = True
    if nqnorms < n_checks_qNorm+1:
        qNormEarlyStop = False

    # =====================================================================
    # basis adaptive polynomial chaos: repeat the calculation by increasing
    # polynomial degree until the highest accuracy is reached
    # =====================================================================
    # For each degree check all q-norms and choose the best one
    scores = -np.inf * np.ones(DegreeArray.shape[0])
    qNormScores = -np.inf * np.ones(nqnorms)

    for degIdx, deg in enumerate(DegreeArray):

        for qidx, q in enumerate(qnorm):

            # Extract the polynomial basis indices from the pool of
            # allBasisIndices
            BasisIndices = self.allBasisIndices[str(deg)][str(q)]

            # Assemble the Psi matrix
            Psi = self.create_psi(BasisIndices, self.univ_p_val)

            # Calulate the cofficients of the meta model
            outs = self.fit(Psi, ED_Y, BasisIndices)

            # Calculate and save the score of LOOCV
            score, LCerror = self.corr_loocv_error(outs['clf_poly'],
                                                   outs['sparePsi'],
                                                   outs['coeffs'],
                                                   ED_Y)

            # Check the convergence of noise for FastARD
            if self.pce_reg_method == 'FastARD' and \
               outs['clf_poly'].alpha_ < np.finfo(np.float32).eps:
                score = -np.inf

            qNormScores[qidx] = score
            qAllCoeffs[str(qidx+1)] = outs['coeffs']
            qAllIndices_Sparse[str(qidx+1)] = outs['spareMulti-Index']
            qAllclf_poly[str(qidx+1)] = outs['clf_poly']
            qAllnTerms[str(qidx+1)] = BasisIndices.shape[0]
            qAllLCerror[str(qidx+1)] = LCerror

            # EarlyStop check
            # if there are at least n_checks_qNorm entries after the
            # best one, we stop
            if qNormEarlyStop and \
               sum(np.isfinite(qNormScores)) > n_checks_qNorm:
                # If the error has increased the last two iterations, stop!
                qNormScores_nonInf = qNormScores[np.isfinite(qNormScores)]
                deltas = np.sign(np.diff(qNormScores_nonInf))
                if sum(deltas[-n_checks_qNorm+1:]) == 2:
                    # stop the q-norm loop here
                    break
            if np.var(ED_Y) == 0:
                break

        # Store the score in the scores list
        best_q = np.nanargmax(qNormScores)
        scores[degIdx] = qNormScores[best_q]

        AllCoeffs[str(degIdx+1)] = qAllCoeffs[str(best_q+1)]
        AllIndices_Sparse[str(degIdx+1)] = qAllIndices_Sparse[str(best_q+1)]
        Allclf_poly[str(degIdx+1)] = qAllclf_poly[str(best_q+1)]
        AllnTerms[str(degIdx+1)] = qAllnTerms[str(best_q+1)]
        AllLCerror[str(degIdx+1)] = qAllLCerror[str(best_q+1)]

        # Check the direction of the error (on average):
        # if it increases consistently stop the iterations
        if len(scores[scores != -np.inf]) > n_checks_degree:
            scores_nonInf = scores[scores != -np.inf]
            ss = np.sign(scores_nonInf - np.max(scores_nonInf))
            # ss<0 error decreasing
            errorIncreases = np.sum(np.sum(ss[-2:])) <= -1*n_checks_degree

        if errorIncreases:
            break

        # Check only one degree, if target matrix has zero variance
        if np.var(ED_Y) == 0:
            break

    # ------------------ Summary of results ------------------
    # Select the one with the best score and save the necessary outputs
    best_deg = np.nanargmax(scores)+1
    coeffs = AllCoeffs[str(best_deg)]
    basis_indices = AllIndices_Sparse[str(best_deg)]
    clf_poly = Allclf_poly[str(best_deg)]
    LOOCVScore = np.nanmax(scores)
    P = AllnTerms[str(best_deg)]
    LCerror = AllLCerror[str(best_deg)]
    degree = DegreeArray[np.nanargmax(scores)]
    qnorm = float(qnorm[best_q])

    # ------------------ Print out Summary of results ------------------
    if self.verbose:
        # Create PSI_Sparse by removing redundent terms
        nnz_idx = np.nonzero(coeffs)[0]
        BasisIndices_Sparse = basis_indices[nnz_idx]

        print(f'Output variable {varIdx+1}:')
        print('The estimation of PCE coefficients converged at polynomial '
              f'degree {DegreeArray[best_deg-1]} with '
              f'{len(BasisIndices_Sparse)} terms (Sparsity index = '
              f'{round(len(BasisIndices_Sparse)/P, 3)}).')

        print(f'Final ModLOO error estimate: {1-max(scores):.3e}')
        print('\n'+'-'*50)

    if verbose:
        print('='*50)
        print(' '*10 + ' Summary of results ')
        print('='*50)

        print("scores:\n", scores)
        print("Best score's degree:", self.DegreeArray[best_deg-1])
        print("NO. of terms:", len(basis_indices))
        print("Sparsity index:", round(len(basis_indices)/P, 3))
        print("Best Indices:\n", basis_indices)

        if self.pce_reg_method in ['BRR', 'ARD']:
            fig, ax = plt.subplots(figsize=(12, 10))
            plt.title("Marginal log-likelihood")
            plt.plot(clf_poly.scores_, color='navy', linewidth=2)
            plt.ylabel("Score")
            plt.xlabel("Iterations")
            if self.pce_reg_method.lower() == 'bbr':
                text = f"$\\alpha={clf_poly.alpha_:.1f}$\n"
                f"$\\lambda={clf_poly.lambda_:.3f}$\n"
                f"$L={clf_poly.scores_[-1]:.1f}$"
            else:
                text = f"$\\alpha={clf_poly.alpha_:.1f}$\n$"
                f"\\L={clf_poly.scores_[-1]:.1f}$"

            plt.text(0.75, 0.5, text, fontsize=18, transform=ax.transAxes)
            plt.show()
        print('='*80)

    # Create a dict to pass the outputs
    returnVars = dict()
    returnVars['clf_poly'] = clf_poly
    returnVars['degree'] = degree
    returnVars['qnorm'] = qnorm
    returnVars['coeffs'] = coeffs
    returnVars['multi_indices'] = basis_indices
    returnVars['LOOCVScore'] = LOOCVScore
    returnVars['LCerror'] = LCerror

    return returnVars
def corr_loocv_error(self, clf, psi, coeffs, y)

Calculates the corrected LOO error for regression on regressor matrix psi that generated the coefficients based on [1] and [2].

[1] Blatman, G., 2009. Adaptive sparse polynomial chaos expansions for uncertainty propagation and sensitivity analysis (Doctoral dissertation, Clermont-Ferrand 2).

[2] Blatman, G. and Sudret, B., 2011. Adaptive sparse polynomial chaos expansion based on least angle regression. Journal of computational Physics, 230(6), pp.2345-2367.

Parameters

clf : object
Fitted estimator.
psi : array of shape (n_samples, n_features)
The multivariate orthogonal polynomials (regressor).
coeffs : array-like of shape (n_features,)
Estimated cofficients.
y : array of shape (n_samples,)
Target values.

Returns

Q_2 : float
LOOCV Validation score (1-LOOCV erro).
residual : array of shape (n_samples,)
Residual values (y - predicted targets).
Expand source code
def corr_loocv_error(self, clf, psi, coeffs, y):
    """
    Calculates the corrected LOO error for regression on regressor
    matrix `psi` that generated the coefficients based on [1] and [2].

    [1] Blatman, G., 2009. Adaptive sparse polynomial chaos expansions for
        uncertainty propagation and sensitivity analysis (Doctoral
        dissertation, Clermont-Ferrand 2).

    [2] Blatman, G. and Sudret, B., 2011. Adaptive sparse polynomial chaos
        expansion based on least angle regression. Journal of computational
        Physics, 230(6), pp.2345-2367.

    Parameters
    ----------
    clf : object
        Fitted estimator.
    psi : array of shape (n_samples, n_features)
        The multivariate orthogonal polynomials (regressor).
    coeffs : array-like of shape (n_features,)
        Estimated cofficients.
    y : array of shape (n_samples,)
        Target values.

    Returns
    -------
    Q_2 : float
        LOOCV Validation score (1-LOOCV erro).
    residual : array of shape (n_samples,)
        Residual values (y - predicted targets).

    """
    psi = np.array(psi, dtype=float)

    # Create PSI_Sparse by removing redundent terms
    nnz_idx = np.nonzero(coeffs)[0]
    if len(nnz_idx) == 0:
        nnz_idx = [0]
    psi_sparse = psi[:, nnz_idx]

    # NrCoeffs of aPCEs
    P = len(nnz_idx)
    # NrEvaluation (Size of experimental design)
    N = psi.shape[0]

    # Build the projection matrix
    PsiTPsi = np.dot(psi_sparse.T, psi_sparse)

    if np.linalg.cond(PsiTPsi) > 1e-12 and \
       np.linalg.cond(PsiTPsi) < 1/sys.float_info.epsilon:
        # faster
        M = sp.linalg.solve(PsiTPsi,
                            sp.sparse.eye(PsiTPsi.shape[0]).toarray())
    else:
        # stabler
        M = np.linalg.pinv(PsiTPsi)

    # h factor (the full matrix is not calculated explicitly,
    # only the trace is, to save memory)
    PsiM = np.dot(psi_sparse, M)

    h = np.sum(np.multiply(PsiM, psi_sparse), axis=1, dtype=np.float128)

    # ------ Calculate Error Loocv for each measurement point ----
    # Residuals
    if isinstance(clf, list):
        residual = np.dot(psi, coeffs) - y
    else:
        residual = clf.predict(psi) - y

    # Variance
    varY = np.var(y)

    if varY == 0:
        normEmpErr = 0
        ErrLoo = 0
        LCerror = np.zeros((y.shape))
    else:
        normEmpErr = np.mean(residual**2)/varY

        # LCerror = np.divide(residual, (1-h))
        LCerror = residual / (1-h)[:, np.newaxis]
        ErrLoo = np.mean(np.square(LCerror)) / varY
        # if there are NaNs, just return an infinite LOO error (this
        # happens, e.g., when a strongly underdetermined problem is solved)
        if np.isnan(ErrLoo):
            ErrLoo = np.inf

    # Corrected Error for over-determined system
    trM = np.trace(M)
    if trM < 0 or abs(trM) > 1e6:
        trM = np.trace(np.linalg.pinv(np.dot(psi.T, psi)))

    # Over-determined system of Equation
    if N > P:
        T_factor = N/(N-P) * (1 + trM)

    # Under-determined system of Equation
    else:
        T_factor = np.inf

    CorrectedErrLoo = ErrLoo * T_factor

    Q_2 = 1 - CorrectedErrLoo

    return Q_2, residual
def pca_transformation(self, Output)

Transforms the targets (outputs) via Principal Component Analysis

Parameters

Output : array of shape (n_samples,)
Target values.

Returns

pca : obj
Fitted sklearnPCA object.
OutputMatrix : array of shape (n_samples,)
Transformed target values.
Expand source code
def pca_transformation(self, Output):
    """
    Transforms the targets (outputs) via Principal Component Analysis

    Parameters
    ----------
    Output : array of shape (n_samples,)
        Target values.

    Returns
    -------
    pca : obj
        Fitted sklearnPCA object.
    OutputMatrix : array of shape (n_samples,)
        Transformed target values.

    """
    # Transform via Principal Component Analysis
    if hasattr(self, 'var_pca_threshold'):
        var_pca_threshold = self.var_pca_threshold
    else:
        var_pca_threshold = 100.0
    n_samples, n_features = Output.shape

    if hasattr(self, 'n_pca_components'):
        n_pca_components = self.n_pca_components
    else:
        # Instantiate and fit sklearnPCA object
        covar_matrix = sklearnPCA(n_components=None)
        covar_matrix.fit(Output)
        var = np.cumsum(np.round(covar_matrix.explained_variance_ratio_,
                                 decimals=5)*100)
        # Find the number of components to explain self.varPCAThreshold of
        # variance
        try:
            n_components = np.where(var >= var_pca_threshold)[0][0] + 1
        except IndexError:
            n_components = min(n_samples, n_features)

        n_pca_components = min(n_samples, n_features, n_components)

    # Print out a report
    print()
    print('-' * 50)
    print(f"PCA transformation is performed with {n_pca_components}"
          " components.")
    print('-' * 50)
    print()

    # Fit and transform with the selected number of components
    pca = sklearnPCA(n_components=n_pca_components,
                     svd_solver='randomized')
    OutputMatrix = pca.fit_transform(Output)

    return pca, OutputMatrix
def gaussian_process_emulator(self, X, y, nug_term=None, autoSelect=False, varIdx=None)

Fits a Gaussian Process Emulator to the target given the training points.

Parameters

X : array of shape (n_samples, n_params)
Training points.
y : array of shape (n_samples,)
Target values.
nug_term : float, optional
Nugget term. The default is None, i.e. variance of y.
autoSelect : bool, optional
Loop over some kernels and select the best. The default is False.
varIdx : int, optional
The index number. The default is None.

Returns

gp : object
Fitted estimator.
Expand source code
def gaussian_process_emulator(self, X, y, nug_term=None, autoSelect=False,
                              varIdx=None):
    """
    Fits a Gaussian Process Emulator to the target given the training
     points.

    Parameters
    ----------
    X : array of shape (n_samples, n_params)
        Training points.
    y : array of shape (n_samples,)
        Target values.
    nug_term : float, optional
        Nugget term. The default is None, i.e. variance of y.
    autoSelect : bool, optional
        Loop over some kernels and select the best. The default is False.
    varIdx : int, optional
        The index number. The default is None.

    Returns
    -------
    gp : object
        Fitted estimator.

    """

    nug_term = nug_term if nug_term else np.var(y)

    Kernels = [nug_term * kernels.RBF(length_scale=1.0,
                                      length_scale_bounds=(1e-25, 1e15)),
               nug_term * kernels.RationalQuadratic(length_scale=0.2,
                                                    alpha=1.0),
               nug_term * kernels.Matern(length_scale=1.0,
                                         length_scale_bounds=(1e-15, 1e5),
                                         nu=1.5)]

    # Automatic selection of the kernel
    if autoSelect:
        gp = {}
        BME = []
        for i, kernel in enumerate(Kernels):
            gp[i] = GaussianProcessRegressor(kernel=kernel,
                                             n_restarts_optimizer=3,
                                             normalize_y=False)

            # Fit to data using Maximum Likelihood Estimation
            gp[i].fit(X, y)

            # Store the MLE as BME score
            BME.append(gp[i].log_marginal_likelihood())

        gp = gp[np.argmax(BME)]

    else:
        gp = GaussianProcessRegressor(kernel=Kernels[0],
                                      n_restarts_optimizer=3,
                                      normalize_y=False)
        gp.fit(X, y)

    # Compute score
    if varIdx is not None:
        Score = gp.score(X, y)
        print('-'*50)
        print(f'Output variable {varIdx}:')
        print('The estimation of GPE coefficients converged,')
        print(f'with the R^2 score: {Score:.3f}')
        print('-'*50)

    return gp
def eval_metamodel(self, samples=None, nsamples=None, sampling_method='random', return_samples=False)

Evaluates meta-model at the requested samples. One can also generate nsamples.

Parameters

samples : array of shape (n_samples, n_params), optional
Samples to evaluate meta-model at. The default is None.
nsamples : int, optional
Number of samples to generate, if no samples is provided. The default is None.
sampling_method : str, optional
Type of sampling, if no samples is provided. The default is 'random'.
return_samples : bool, optional
Retun samples, if no samples is provided. The default is False.

Returns

mean_pred : dict
Mean of the predictions.
std_pred : dict
Standard deviatioon of the predictions.
Expand source code
def eval_metamodel(self, samples=None, nsamples=None,
                   sampling_method='random', return_samples=False):
    """
    Evaluates meta-model at the requested samples. One can also generate
    nsamples.

    Parameters
    ----------
    samples : array of shape (n_samples, n_params), optional
        Samples to evaluate meta-model at. The default is None.
    nsamples : int, optional
        Number of samples to generate, if no `samples` is provided. The
        default is None.
    sampling_method : str, optional
        Type of sampling, if no `samples` is provided. The default is
        'random'.
    return_samples : bool, optional
        Retun samples, if no `samples` is provided. The default is False.

    Returns
    -------
    mean_pred : dict
        Mean of the predictions.
    std_pred : dict
        Standard deviatioon of the predictions.
    """
    if self.meta_model_type.lower() == 'gpe':
        model_dict = self.gp_poly
    else:
        model_dict = self.coeffs_dict

    if samples is None:
        if nsamples is None:
            self.n_samples = 100000
        else:
            self.n_samples = nsamples

        samples = self.ExpDesign.generate_samples(self.n_samples,
                                                  sampling_method)
    else:
        self.samples = samples
        self.n_samples = len(samples)

    # Transform samples
    samples = self.ExpDesign.transform(samples)

    if self.meta_model_type.lower() != 'gpe':
        univ_p_val = self.univ_basis_vals(samples,
                                          n_max=np.max(self.pce_deg))

    mean_pred = {}
    std_pred = {}

    # Loop over outputs
    for ouput, values in model_dict.items():

        mean = np.zeros((len(samples), len(values)))
        std = np.zeros((len(samples), len(values)))
        idx = 0
        for in_key, InIdxValues in values.items():

            # Perdiction with GPE
            if self.meta_model_type.lower() == 'gpe':
                X_T = self.x_scaler[ouput].transform(samples)
                gp = self.gp_poly[ouput][in_key]
                y_mean, y_std = gp.predict(X_T, return_std=True)

            else:
                # Perdiction with PCE or pcekriging
                # Assemble Psi matrix
                psi = self.create_psi(self.basis_dict[ouput][in_key],
                                      univ_p_val)
                # Perdiction
                try:
                    # with error bar
                    clf_poly = self.clf_poly[ouput][in_key]
                    y_mean, y_std = clf_poly.predict(psi, return_std=True)

                except:
                    # without error bar
                    coeffs = self.coeffs_dict[ouput][in_key]
                    y_mean = np.dot(psi, coeffs)
                    y_std = np.zeros_like(y_mean)

            mean[:, idx] = y_mean
            std[:, idx] = y_std
            idx += 1

        if self.dim_red_method.lower() == 'pca':
            PCA = self.pca[ouput]
            mean_pred[ouput] = PCA.mean_ + np.dot(mean, PCA.components_)
            std_pred[ouput] = np.sqrt(np.dot(std**2, PCA.components_**2))
        else:
            mean_pred[ouput] = mean
            std_pred[ouput] = std

    if return_samples:
        return mean_pred, std_pred, samples
    else:
        return mean_pred, std_pred
def create_model_error(self, X, y, name='Calib')

Fits a GPE-based model error.

Parameters

X : array of shape (n_outputs, n_inputs)
Input array. It can contain any forcing inputs or coordinates of extracted data.
y : array of shape (n_outputs,)
The model response for the MAP parameter set.
name : str, optional
Calibration or validation. The default is 'Calib'.

Returns

self : object
Self object.
Expand source code
def create_model_error(self, X, y, name='Calib'):
    """
    Fits a GPE-based model error.

    Parameters
    ----------
    X : array of shape (n_outputs, n_inputs)
        Input array. It can contain any forcing inputs or coordinates of
         extracted data.
    y : array of shape (n_outputs,)
        The model response for the MAP parameter set.
    name : str, optional
        Calibration or validation. The default is `'Calib'`.

    Returns
    -------
    self: object
        Self object.

    """
    Model = self.ModelObj
    outputNames = Model.Output.Names
    self.errorRegMethod = 'GPE'
    self.errorclf_poly = self.auto_vivification()
    self.errorScale = self.auto_vivification()

    # Read data
    MeasuredData = Model.read_observation(case=name)

    # Fitting GPR based bias model
    for out in outputNames:
        nan_idx = ~np.isnan(MeasuredData[out])
        # Select data
        try:
            data = MeasuredData[out].values[nan_idx]
        except AttributeError:
            data = MeasuredData[out][nan_idx]

        # Prepare the input matrix
        scaler = MinMaxScaler()
        delta = data  # - y[out][0]
        BiasInputs = np.hstack((X[out], y[out].reshape(-1, 1)))
        X_S = scaler.fit_transform(BiasInputs)
        gp = self.gaussian_process_emulator(X_S, delta)

        self.errorScale[out]["y_1"] = scaler
        self.errorclf_poly[out]["y_1"] = gp

    return self
def eval_model_error(self, X, y_pred)

Evaluates the error model.

Parameters

X : array
Inputs.
y_pred : dict
Predictions.

Returns

mean_pred : dict
Mean predition of the GPE-based error model.
std_pred : dict
standard deviation of the GPE-based error model.
Expand source code
def eval_model_error(self, X, y_pred):
    """
    Evaluates the error model.

    Parameters
    ----------
    X : array
        Inputs.
    y_pred : dict
        Predictions.

    Returns
    -------
    mean_pred : dict
        Mean predition of the GPE-based error model.
    std_pred : dict
        standard deviation of the GPE-based error model.

    """
    mean_pred = {}
    std_pred = {}

    for Outkey, ValuesDict in self.errorclf_poly.items():

        pred_mean = np.zeros_like(y_pred[Outkey])
        pred_std = np.zeros_like(y_pred[Outkey])

        for Inkey, InIdxValues in ValuesDict.items():

            gp = self.errorclf_poly[Outkey][Inkey]
            scaler = self.errorScale[Outkey][Inkey]

            # Transform Samples using scaler
            for j, pred in enumerate(y_pred[Outkey]):
                BiasInputs = np.hstack((X[Outkey], pred.reshape(-1, 1)))
                Samples_S = scaler.transform(BiasInputs)
                y_hat, y_std = gp.predict(Samples_S, return_std=True)
                pred_mean[j] = y_hat
                pred_std[j] = y_std
                # pred_mean[j] += pred

        mean_pred[Outkey] = pred_mean
        std_pred[Outkey] = pred_std

    return mean_pred, std_pred
class PostProcessing (MetaModel, name='calib')

This class provides many helper functions to post-process the trained meta-model.

Attributes

MetaModel : obj
MetaModel object to do postprocessing on.
name : str
Type of the anaylsis. The default is 'calib'. If a validation is expected to be performed change this to 'valid'.
Expand source code
class PostProcessing:
    """
    This class provides many helper functions to post-process the trained
    meta-model.

    Attributes
    ----------
    MetaModel : obj
        MetaModel object to do postprocessing on.
    name : str
        Type of the anaylsis. The default is `'calib'`. If a validation is
        expected to be performed change this to `'valid'`.
    """

    def __init__(self, MetaModel, name='calib'):
        self.MetaModel = MetaModel
        self.name = name

    # -------------------------------------------------------------------------
    def plot_moments(self, xlabel='Time [s]', plot_type=None, save_fig=True):
        """
        Plots the moments in a pdf format in the directory
        `Outputs_PostProcessing`.

        Parameters
        ----------
        xlabel : str, optional
            String to be displayed as x-label. The default is `'Time [s]'`.
        plot_type : str, optional
            Options: bar or line. The default is `None`.
        save_fig : bool, optional
            Save figure or not. The default is `True`.

        Returns
        -------
        pce_means: dict
            Mean of the model outputs.
        pce_means: dict
            Standard deviation of the model outputs.

        """

        bar_plot = True if plot_type == 'bar' else False
        meta_model_type = self.MetaModel.meta_model_type
        Model = self.MetaModel.ModelObj

        # Read Monte-Carlo reference
        self.mc_reference = Model.read_mc_reference()
        print(self.mc_reference)

        # Set the x values
        x_values_orig = self.MetaModel.ExpDesign.x_values

        # Compute the moments with the PCEModel object
        self._compute_pce_moments()

        # Get the variables
        out_names = Model.Output.names

        # Open a pdf for the plots
        if save_fig:
            newpath = (f'Outputs_PostProcessing_{self.name}/')
            if not os.path.exists(newpath):
                os.makedirs(newpath)

            # create a PdfPages object
            pdf = PdfPages(f'./{newpath}Mean_Std_PCE.pdf')

        # Plot the best fit line, set the linewidth (lw), color and
        # transparency (alpha) of the line
        for idx, key in enumerate(out_names):
            fig, ax = plt.subplots(nrows=1, ncols=2)

            # Extract mean and std
            mean_data = self.pce_means[key]
            std_data = self.pce_stds[key]

            # Extract a list of x values
            if type(x_values_orig) is dict:
                x = x_values_orig[key]
            else:
                x = x_values_orig

            # Plot: bar plot or line plot
            if bar_plot:
                ax[0].bar(list(map(str, x)), mean_data, color='b',
                          width=0.25)
                ax[1].bar(list(map(str, x)), std_data, color='b',
                          width=0.25)
                ax[0].legend(labels=[meta_model_type])
                ax[1].legend(labels=[meta_model_type])
            else:
                ax[0].plot(x, mean_data, lw=3, color='k', marker='x',
                           label=meta_model_type)
                ax[1].plot(x, std_data, lw=3, color='k', marker='x',
                           label=meta_model_type)

            if self.mc_reference is not None:
                if bar_plot:
                    ax[0].bar(list(map(str, x)), self.mc_reference['mean'],
                              color='r', width=0.25)
                    ax[1].bar(list(map(str, x)), self.mc_reference['std'],
                              color='r', width=0.25)
                    ax[0].legend(labels=[meta_model_type])
                    ax[1].legend(labels=[meta_model_type])
                else:
                    ax[0].plot(x, self.mc_reference['mean'], lw=3, marker='x',
                               color='r', label='Ref.')
                    ax[1].plot(x, self.mc_reference['std'], lw=3, marker='x',
                               color='r', label='Ref.')

            # Label the axes and provide a title
            ax[0].set_xlabel(xlabel)
            ax[1].set_xlabel(xlabel)
            ax[0].set_ylabel(key)
            ax[1].set_ylabel(key)

            # Provide a title
            ax[0].set_title('Mean of ' + key)
            ax[1].set_title('Std of ' + key)

            if not bar_plot:
                ax[0].legend(loc='best')
                ax[1].legend(loc='best')

            plt.tight_layout()

            if save_fig:
                # save the current figure
                pdf.savefig(fig, bbox_inches='tight')

                # Destroy the current plot
                plt.clf()

        pdf.close()

        return self.pce_means, self.pce_stds

    # -------------------------------------------------------------------------
    def valid_metamodel(self, n_samples=1, samples=None, x_axis='Time [s]'):
        """
        Evaluates and plots the meta model and the PCEModel outputs for the
        given number of samples or the given samples.

        Parameters
        ----------
        n_samples : int, optional
            Number of samples to be evaluated. The default is 1.
        samples : array of shape (n_samples, n_params), optional
            Samples to be evaluated. The default is None.
        x_axis : str, optional
            Label of x axis. The default is `'Time [s]'`.

        Returns
        -------
        None.

        """
        MetaModel = self.MetaModel
        Model = MetaModel.ModelObj

        if samples is None:
            self.n_samples = n_samples
            samples = self._get_sample()
        else:
            self.n_samples = samples.shape[0]

        # Extract x_values
        x_values = MetaModel.ExpDesign.x_values

        self.model_out_dict = self._eval_model(samples, key_str='valid')
        self.pce_out_mean, self.pce_out_std = MetaModel.eval_metamodel(samples)

        try:
            key = Model.Output.names[1]
        except IndexError:
            key = Model.Output.names[0]

        n_obs = self.model_out_dict[key].shape[1]

        if n_obs == 1:
            self._plot_validation()
        else:
            self._plot_validation_multi(x_values=x_values, x_axis=x_axis)

    # -------------------------------------------------------------------------
    def check_accuracy(self, n_samples=None, samples=None, outputs=None):
        """
        Checks accuracy of the metamodel by computing the root mean square
        error and validation error for all outputs.

        Parameters
        ----------
        n_samples : int, optional
            Number of samples. The default is None.
        samples : array of shape (n_samples, n_params), optional
            Parameter sets to be checked. The default is None.
        outputs : dict, optional
            Output dictionary with model outputs for all given output types in
            `Model.Output.names`. The default is None.

        Raises
        ------
        Exception
            When neither n_samples nor samples are provided.

        Returns
        -------
        rmse: dict
            Root mean squared error for each output.
        valid_error : dict
            Validation error for each output.

        """
        MetaModel = self.MetaModel
        Model = MetaModel.ModelObj

        # Set the number of samples
        if n_samples:
            self.n_samples = n_samples
        elif samples is not None:
            self.n_samples = samples.shape[0]
        else:
            raise Exception("Please provide either samples or pass number of "
                            "samples!")

        # Generate random samples if necessary
        Samples = self._get_sample() if samples is None else samples

        # Run the original model with the generated samples
        if outputs is None:
            outputs = self._eval_model(Samples, key_str='validSet')

        # Run the PCE model with the generated samples
        pce_outputs, _ = MetaModel.eval_metamodel(samples=Samples)

        self.rmse = {}
        self.valid_error = {}
        # Loop over the keys and compute RMSE error.
        for key in Model.Output.names:
            # Root mena square
            self.rmse[key] = mean_squared_error(outputs[key], pce_outputs[key],
                                                squared=False,
                                                multioutput='raw_values')
            # Validation error
            self.valid_error[key] = (self.rmse[key]**2 / self.n_samples) / \
                np.var(outputs[key], ddof=1, axis=0)

            # Print a report table
            print("\n>>>>> Errors of {} <<<<<".format(key))
            print("\nIndex  |  RMSE   |  Validation Error")
            print('-'*35)
            print('\n'.join(f'{i+1}  |  {k:.3e}  |  {j:.3e}' for i, (k, j)
                            in enumerate(zip(self.rmse[key],
                                             self.valid_error[key]))))
        # Save error dicts in PCEModel object
        self.MetaModel.rmse = self.rmse
        self.MetaModel.valid_error = self.valid_error

        return self.rmse, self.valid_error

    # -------------------------------------------------------------------------
    def plot_seq_design_diagnostics(self, ref_BME_KLD=None, save_fig=True):
        """
        Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence
        (KLD) for the sequential design.

        Parameters
        ----------
        ref_BME_KLD : array, optional
            Reference BME and KLD . The default is `None`.
        save_fig : bool, optional
            Whether to save the figures. The default is `True`.

        Returns
        -------
        None.

        """
        PCEModel = self.MetaModel
        n_init_samples = PCEModel.ExpDesign.n_init_samples
        n_total_samples = PCEModel.ExpDesign.X.shape[0]

        if save_fig:
            newpath = f'Outputs_PostProcessing_{self.name}/'
            if not os.path.exists(newpath):
                os.makedirs(newpath)

            # create a PdfPages object
            pdf = PdfPages(f'./{newpath}/seqPCEModelDiagnostics.pdf')

        plotList = ['Modified LOO error', 'Validation error', 'KLD', 'BME',
                    'RMSEMean', 'RMSEStd', 'Hellinger distance']
        seqList = [PCEModel.SeqModifiedLOO, PCEModel.seqValidError,
                   PCEModel.SeqKLD, PCEModel.SeqBME, PCEModel.seqRMSEMean,
                   PCEModel.seqRMSEStd, PCEModel.SeqDistHellinger]

        markers = ('x', 'o', 'd', '*', '+')
        colors = ('k', 'darkgreen', 'b', 'navy', 'darkred')

        # Plot the evolution of the diagnostic criteria of the
        # Sequential Experimental Design.
        for plotidx, plot in enumerate(plotList):
            fig, ax = plt.subplots()
            seq_dict = seqList[plotidx]
            name_util = list(seq_dict.keys())

            if len(name_util) == 0:
                continue

            # Box plot when Replications have been detected.
            if any(int(name.split("rep_", 1)[1]) > 1 for name in name_util):
                # Extract the values from dict
                sorted_seq_opt = {}
                # Number of replications
                n_reps = PCEModel.ExpDesign.nReprications

                # Get the list of utility function names
                # Handle if only one UtilityFunction is provided
                if not isinstance(PCEModel.ExpDesign.UtilityFunction, list):
                    util_funcs = [PCEModel.ExpDesign.UtilityFunction]
                else:
                    util_funcs = PCEModel.ExpDesign.UtilityFunction

                for util in util_funcs:
                    sortedSeq = {}
                    # min number of runs available from reps
                    n_runs = min([seq_dict[f'{util}_rep_{i+1}'].shape[0]
                                 for i in range(n_reps)])

                    for runIdx in range(n_runs):
                        values = []
                        for key in seq_dict.keys():
                            if util in key:
                                values.append(seq_dict[key][runIdx].mean())
                        sortedSeq['SeqItr_'+str(runIdx)] = np.array(values)
                    sorted_seq_opt[util] = sortedSeq

                # BoxPlot
                def draw_plot(data, labels, edge_color, fill_color, idx):
                    pos = labels - (idx-1)
                    bp = plt.boxplot(data, positions=pos, labels=labels,
                                     patch_artist=True, sym='', widths=0.75)
                    elements = ['boxes', 'whiskers', 'fliers', 'means',
                                'medians', 'caps']
                    for element in elements:
                        plt.setp(bp[element], color=edge_color[idx])

                    for patch in bp['boxes']:
                        patch.set(facecolor=fill_color[idx])

                if PCEModel.ExpDesign.n_new_samples != 1:
                    step1 = PCEModel.ExpDesign.n_new_samples
                    step2 = 1
                else:
                    step1 = 5
                    step2 = 5
                edge_color = ['red', 'blue', 'green']
                fill_color = ['tan', 'cyan', 'lightgreen']
                plot_label = plot
                # Plot for different Utility Functions
                for idx, util in enumerate(util_funcs):
                    all_errors = np.empty((n_reps, 0))

                    for key in list(sorted_seq_opt[util].keys()):
                        errors = sorted_seq_opt.get(util, {}).get(key)[:, None]
                        all_errors = np.hstack((all_errors, errors))

                    # Special cases for BME and KLD
                    if plot == 'KLD' or plot == 'BME':
                        # BME convergence if refBME is provided
                        if ref_BME_KLD is not None:
                            if plot == 'BME':
                                refValue = ref_BME_KLD[0]
                                plot_label = r'$BME/BME^{Ref.}$'
                            if plot == 'KLD':
                                refValue = ref_BME_KLD[1]
                                plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\
                                    ' / D_{KL}^{Ref.}[p(\theta|y_*), '\
                                    'p(\theta)]$'

                            # Difference between BME/KLD and the ref. values
                            all_errors = np.divide(all_errors,
                                                   np.full((all_errors.shape),
                                                           refValue))

                            # Plot baseline for zero, i.e. no difference
                            plt.axhline(y=1.0, xmin=0, xmax=1, c='green',
                                        ls='--', lw=2)

                    # Plot each UtilFuncs
                    labels = np.arange(n_init_samples, n_total_samples+1, step1)
                    draw_plot(all_errors[:, ::step2], labels, edge_color,
                              fill_color, idx)

                plt.xticks(labels, labels)
                # Set the major and minor locators
                ax.xaxis.set_major_locator(ticker.AutoLocator())
                ax.xaxis.set_minor_locator(ticker.AutoMinorLocator())
                ax.xaxis.grid(True, which='major', linestyle='-')
                ax.xaxis.grid(True, which='minor', linestyle='--')

                # Legend
                legend_elements = []
                for idx, util in enumerate(util_funcs):
                    legend_elements.append(Patch(facecolor=fill_color[idx],
                                                 edgecolor=edge_color[idx],
                                                 label=util))
                plt.legend(handles=legend_elements[::-1], loc='best')

                if plot != 'BME' and plot != 'KLD':
                    plt.yscale('log')
                plt.autoscale(True)
                plt.xlabel('\\# of training samples')
                plt.ylabel(plot_label)
                plt.title(plot)

                if save_fig:
                    # save the current figure
                    pdf.savefig(fig, bbox_inches='tight')
                    # Destroy the current plot
                    plt.clf()
                    # Save arrays into files
                    f = open(f'./{newpath}/Seq{plot}.txt', 'w')
                    f.write(str(sorted_seq_opt))
                    f.close()
            else:
                for idx, name in enumerate(name_util):
                    seq_values = seq_dict[name]
                    if PCEModel.ExpDesign.n_new_samples != 1:
                        step = PCEModel.ExpDesign.n_new_samples
                    else:
                        step = 1
                    x_idx = np.arange(n_init_samples, n_total_samples+1, step)
                    if n_total_samples not in x_idx:
                        x_idx = np.hstack((x_idx, n_total_samples))

                    if plot == 'KLD' or plot == 'BME':
                        # BME convergence if refBME is provided
                        if ref_BME_KLD is not None:
                            if plot == 'BME':
                                refValue = ref_BME_KLD[0]
                                plot_label = r'$BME/BME^{Ref.}$'
                            if plot == 'KLD':
                                refValue = ref_BME_KLD[1]
                                plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\
                                    ' / D_{KL}^{Ref.}[p(\theta|y_*), '\
                                    'p(\theta)]$'

                            # Difference between BME/KLD and the ref. values
                            values = np.divide(seq_values,
                                               np.full((seq_values.shape),
                                                       refValue))

                            # Plot baseline for zero, i.e. no difference
                            plt.axhline(y=1.0, xmin=0, xmax=1, c='green',
                                        ls='--', lw=2)

                            # Set the limits
                            plt.ylim([1e-1, 1e1])

                            # Create the plots
                            plt.semilogy(x_idx, values, marker=markers[idx],
                                         color=colors[idx], ls='--', lw=2,
                                         label=name.split("_rep", 1)[0])
                        else:
                            plot_label = plot

                            # Create the plots
                            plt.plot(x_idx, seq_values, marker=markers[idx],
                                     color=colors[idx], ls='--', lw=2,
                                     label=name.split("_rep", 1)[0])

                    else:
                        plot_label = plot
                        seq_values = np.nan_to_num(seq_values)

                        # Plot the error evolution for each output
                        for i in range(seq_values.shape[1]):
                            plt.semilogy(x_idx, seq_values[:, i], ls='--',
                                         lw=2, marker=markers[idx],
                                         color=colors[idx], alpha=0.15)

                        plt.semilogy(x_idx, seq_values, marker=markers[idx],
                                     ls='--', lw=2, color=colors[idx],
                                     label=name.split("_rep", 1)[0])

                # Set the major and minor locators
                ax.xaxis.set_major_locator(ticker.AutoLocator())
                ax.xaxis.set_minor_locator(ticker.AutoMinorLocator())
                ax.xaxis.grid(True, which='major', linestyle='-')
                ax.xaxis.grid(True, which='minor', linestyle='--')

                ax.tick_params(axis='both', which='major', direction='in',
                               width=3, length=10)
                ax.tick_params(axis='both', which='minor', direction='in',
                               width=2, length=8)
                plt.xlabel('Number of runs')
                plt.ylabel(plot_label)
                plt.title(plot)
                plt.legend(frameon=True)

                if save_fig:
                    # save the current figure
                    pdf.savefig(fig, bbox_inches='tight')
                    # Destroy the current plot
                    plt.clf()

                    # ---------------- Saving arrays into files ---------------
                    np.save(f'./{newpath}/Seq{plot}.npy', seq_values)

        # Close the pdf
        pdf.close()
        return

    # -------------------------------------------------------------------------
    def sobol_indices(self, xlabel='Time [s]', plot_type=None, save_fig=True):
        """
        Provides Sobol indices as a sensitivity measure to infer the importance
        of the input parameters. See Eq. 27 in [1] for more details. For the
        case with Principal component analysis refer to [2].

        [1] Global sensitivity analysis: A flexible and efficient framework
        with an example from stochastic hydrogeology S. Oladyshkin, F.P.
        de Barros, W. Nowak  https://doi.org/10.1016/j.advwatres.2011.11.001

        [2] Nagel, J.B., Rieckermann, J. and Sudret, B., 2020. Principal
        component analysis and sparse polynomial chaos expansions for global
        sensitivity analysis and model calibration: Application to urban
        drainage simulation. Reliability Engineering & System Safety, 195,
        p.106737.

        Parameters
        ----------
        xlabel : str, optional
            Label of the x-axis. The default is `'Time [s]'`.
        plot_type : str, optional
            Plot type. The default is `None`. This corresponds to line plot.
            Bar chart can be selected by `bar`.
        save_fig : bool, optional
            Whether to save the figures. The default is `True`.

        Returns
        -------
        sobol_cell: dict
            Sobol indices.
        total_sobol: dict
            Total Sobol indices.

        """
        # Extract the necessary variables
        PCEModel = self.MetaModel
        basis_dict = PCEModel.basis_dict
        coeffs_dict = PCEModel.coeffs_dict
        n_params = PCEModel.n_params
        max_order = np.max(PCEModel.pce_deg)
        self.sobol_cell = {}
        self.total_sobol = {}

        for Output in PCEModel.ModelObj.Output.names:

            n_meas_points = len(coeffs_dict[Output])

            # Initialize the (cell) array containing the (total) Sobol indices.
            sobol_array = dict.fromkeys(range(1, max_order+1), [])
            sobol_cell_array = dict.fromkeys(range(1, max_order+1), [])

            for i_order in range(1, max_order+1):
                n_comb = math.comb(n_params, i_order)

                sobol_cell_array[i_order] = np.zeros((n_comb, n_meas_points))

            total_sobol_array = np.zeros((n_params, n_meas_points))

            # Initialize the cell to store the names of the variables
            TotalVariance = np.zeros((n_meas_points))

            # Loop over all measurement points and calculate sobol indices
            for pIdx in range(n_meas_points):

                # Extract the basis indices (alpha) and coefficients
                Basis = basis_dict[Output][f'y_{pIdx+1}']

                try:
                    clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+1}']
                    PCECoeffs = clf_poly.coef_
                except:
                    PCECoeffs = coeffs_dict[Output][f'y_{pIdx+1}']

                # Compute total variance
                TotalVariance[pIdx] = np.sum(np.square(PCECoeffs[1:]))

                nzidx = np.where(PCECoeffs != 0)[0]
                # Set all the Sobol indices equal to zero in the presence of a
                # null output.
                if len(nzidx) == 0:
                    # This is buggy.
                    for i_order in range(1, max_order+1):
                        sobol_cell_array[i_order][:, pIdx] = 0

                # Otherwise compute them by summing well-chosen coefficients
                else:
                    nz_basis = Basis[nzidx]
                    for i_order in range(1, max_order+1):
                        idx = np.where(np.sum(nz_basis > 0, axis=1) == i_order)
                        subbasis = nz_basis[idx]
                        Z = np.array(list(combinations(range(n_params), i_order)))

                        for q in range(Z.shape[0]):
                            Zq = Z[q]
                            subsubbasis = subbasis[:, Zq]
                            subidx = np.prod(subsubbasis, axis=1) > 0
                            sum_ind = nzidx[idx[0][subidx]]
                            if TotalVariance[pIdx] == 0.0:
                                sobol_cell_array[i_order][q, pIdx] = 0.0
                            else:
                                sobol = np.sum(np.square(PCECoeffs[sum_ind]))
                                sobol /= TotalVariance[pIdx]
                                sobol_cell_array[i_order][q, pIdx] = sobol

                    # Compute the TOTAL Sobol indices.
                    for ParIdx in range(n_params):
                        idx = nz_basis[:, ParIdx] > 0
                        sum_ind = nzidx[idx]

                        if TotalVariance[pIdx] == 0.0:
                            total_sobol_array[ParIdx, pIdx] = 0.0
                        else:
                            sobol = np.sum(np.square(PCECoeffs[sum_ind]))
                            sobol /= TotalVariance[pIdx]
                            total_sobol_array[ParIdx, pIdx] = sobol

                # ----- if PCA selected: Compute covariance -----
                if PCEModel.dim_red_method.lower() == 'pca':
                    cov_Z_p_q = np.zeros((n_params))
                    # Extract the basis indices (alpha) and coefficients for 
                    # next component
                    if pIdx < n_meas_points-1:
                        nextBasis = basis_dict[Output][f'y_{pIdx+2}']

                        try:
                            clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+2}']
                            nextPCECoeffs = clf_poly.coef_
                        except:
                            nextPCECoeffs = coeffs_dict[Output][f'y_{pIdx+2}']

                        # Choose the common non-zero basis
                        mask = (Basis[:, None] == nextBasis).all(-1).any(-1)
                        similar_basis = Basis[mask]
                        # Compute the TOTAL Sobol indices.
                        for ParIdx in range(n_params):
                            idx = similar_basis[:, ParIdx] > 0
                            try:
                                sum_is = nzidx[idx]
                                cov_Z_p_q[ParIdx] = np.sum(PCECoeffs[sum_ind] *
                                                           nextPCECoeffs[sum_is])
                            except:
                                cov_Z_p_q[ParIdx] = 0.0

            # Compute the sobol indices according to Ref. 2
            if PCEModel.dim_red_method.lower() == 'pca':
                n_c_points = PCEModel.ExpDesign.Y[Output].shape[1]
                PCA = PCEModel.pca[Output]
                compPCA = PCA.components_
                nComp = compPCA.shape[0]
                var_Z_p = PCA.explained_variance_

                # Extract the sobol index of the components
                for i_order in range(1, max_order+1):
                    n_comb = math.comb(n_params, i_order)
                    sobol_array[i_order] = np.zeros((n_comb, n_c_points))
                    Z = np.array(list(combinations(range(n_params), i_order)))

                    for q in range(Z.shape[0]):
                        S_Z_i = sobol_cell_array[i_order][q]

                        for tIdx in range(n_c_points):
                            var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx])
                            if var_Y_t == 0.0:
                                term1, term2 = 0.0, 0.0
                            else:
                                term1 = np.sum([S_Z_i[i]*(var_Z_p[i]*(compPCA[i, tIdx]**2)/var_Y_t) for i in range(nComp)])

                                # Term 2
                                # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs
                                Phi_t_p = compPCA[:nComp-1]
                                Phi_t_q = compPCA
                                term2 = 2 * np.sum([cov_Z_p_q[ParIdx] * Phi_t_p[i,tIdx] * Phi_t_q[i,tIdx]/var_Y_t for i in range(nComp-1)])

                            sobol_array[i_order][q, tIdx] = term1 #+ term2

                # Compute the TOTAL Sobol indices.
                total_sobol = np.zeros((n_params, n_c_points))
                for ParIdx in range(n_params):
                    S_Z_i = total_sobol_array[ParIdx]

                    for tIdx in range(n_c_points):
                        var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx])
                        if var_Y_t == 0.0:
                            term1, term2 = 0.0, 0.0
                        else:
                            term1 = 0
                            for i in range(nComp):
                                term1 += S_Z_i[i] * var_Z_p[i] * \
                                    (compPCA[i, tIdx]**2) / var_Y_t

                            # Term 2
                            # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs
                            Phi_t_p = compPCA[:nComp-1]
                            Phi_t_q = compPCA
                            term2 = 0
                            for i in range(nComp-1):
                                term2 += cov_Z_p_q[ParIdx] * Phi_t_p[i, tIdx] \
                                    * Phi_t_q[i, tIdx] / var_Y_t
                            term2 *= 2

                        total_sobol[ParIdx, tIdx] = term1 + term2

                self.sobol_cell[Output] = sobol_array
                self.total_sobol[Output] = total_sobol
            else:
                self.sobol_cell[Output] = sobol_cell_array
                self.total_sobol[Output] = total_sobol_array

        # ---------------- Plot -----------------------
        par_names = PCEModel.ExpDesign.par_names
        x_values_orig = PCEModel.ExpDesign.x_values

        cases = ['']

        for case in cases:
            newpath = (f'Outputs_PostProcessing_{self.name}/')
            if not os.path.exists(newpath):
                os.makedirs(newpath)

            if save_fig:
                # create a PdfPages object
                name = case+'_' if 'Valid' in cases else ''
                pdf = PdfPages('./'+newpath+name+'Sobol_indices.pdf')

            fig = plt.figure()

            for outIdx, Output in enumerate(PCEModel.ModelObj.Output.names):

                # Extract total Sobol indices
                total_sobol = self.total_sobol[Output]

                # Extract a list of x values
                if type(x_values_orig) is dict:
                    x = x_values_orig[Output]
                else:
                    x = x_values_orig

                if plot_type == 'bar':
                    ax = fig.add_axes([0, 0, 1, 1])
                    dict1 = {xlabel: x}
                    dict2 = {param: sobolIndices for param, sobolIndices
                             in zip(par_names, total_sobol)}

                    df = pd.DataFrame({**dict1, **dict2})
                    df.plot(x=xlabel, y=par_names, kind="bar", ax=ax, rot=0,
                            colormap='Dark2')
                    ax.set_ylabel('Total Sobol indices, $S^T$')

                else:
                    for i, sobolIndices in enumerate(total_sobol):
                        plt.plot(x, sobolIndices, label=par_names[i],
                                 marker='x', lw=2.5)

                    plt.ylabel('Total Sobol indices, $S^T$')
                    plt.xlabel(xlabel)

                plt.title(f'Sensitivity analysis of {Output}')
                if plot_type != 'bar':
                    plt.legend(loc='best', frameon=True)

                # Save indices
                np.savetxt(f'./{newpath}{name}totalsobol_' +
                           Output.replace('/', '_') + '.csv',
                           total_sobol.T, delimiter=',',
                           header=','.join(par_names), comments='')

                if save_fig:
                    # save the current figure
                    pdf.savefig(fig, bbox_inches='tight')

                    # Destroy the current plot
                    plt.clf()

            pdf.close()

        return self.sobol_cell, self.total_sobol

    # -------------------------------------------------------------------------
    def check_reg_quality(self, n_samples=1000, samples=None, save_fig=True):
        """
        Checks the quality of the metamodel for single output models based on:
        https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685


        Parameters
        ----------
        n_samples : int, optional
            Number of parameter sets to use for the check. The default is 1000.
        samples : array of shape (n_samples, n_params), optional
            Parameter sets to use for the check. The default is None.
        save_fig : bool, optional
            Whether to save the figures. The default is True.

        Returns
        -------
        None.

        """
        MetaModel = self.MetaModel

        if samples is None:
            self.n_samples = n_samples
            samples = self._get_sample()
        else:
            self.n_samples = samples.shape[0]

        # Evaluate the original and the surrogate model
        y_val = self._eval_model(samples, key_str='valid')
        y_pce_val, _ = MetaModel.eval_metamodel(samples=samples)

        # Open a pdf for the plots
        if save_fig:
            newpath = (r'Outputs_PostProcessing_{0}/'.format(self.name))
            if not os.path.exists(newpath):
                os.makedirs(newpath)

        # Fit the data(train the model)
        for key in y_pce_val.keys():

            y_pce_val_ = y_pce_val[key]
            y_val_ = y_val[key]

            # ------ Residuals vs. predicting variables ------
            # Check the assumptions of linearity and independence
            fig1 = plt.figure()
            plt.title(key+": Residuals vs. Predicting variables")
            residuals = y_val_ - y_pce_val_
            plt.scatter(x=y_val_, y=residuals, color='blue', edgecolor='k')
            plt.grid(True)
            xmin, xmax = min(y_val_), max(y_val_)
            plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3,
                       linestyle='--')
            plt.xlabel(key)
            plt.ylabel('Residuals')
            plt.show()

            if save_fig:
                # save the current figure
                fig1.savefig(f'./{newpath}/Residuals_vs_PredVariables.pdf',
                             bbox_inches='tight')
                # Destroy the current plot
                plt.clf()

            # ------ Fitted vs. residuals ------
            # Check the assumptions of linearity and independence
            fig2 = plt.figure()
            plt.title(key+": Residuals vs. predicting variables")
            plt.scatter(x=y_pce_val_, y=residuals, color='blue', edgecolor='k')
            plt.grid(True)
            xmin, xmax = min(y_val_), max(y_val_)
            plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3,
                       linestyle='--')
            plt.xlabel(key)
            plt.ylabel('Residuals')
            plt.show()

            if save_fig:
                # save the current figure
                fig2.savefig(f'./{newpath}/Fitted_vs_Residuals.pdf',
                             bbox_inches='tight')
                # Destroy the current plot
                plt.clf()

            # ------ Histogram of normalized residuals ------
            fig3 = plt.figure()
            resid_pearson = residuals / (max(residuals)-min(residuals))
            plt.hist(resid_pearson, bins=20, edgecolor='k')
            plt.ylabel('Count')
            plt.xlabel('Normalized residuals')
            plt.title(f"{key}: Histogram of normalized residuals")

            # Normality (Shapiro-Wilk) test of the residuals
            ax = plt.gca()
            _, p = stats.shapiro(residuals)
            if p < 0.01:
                annText = "The residuals seem to come from Gaussian process."
            else:
                annText = "The normality assumption may not hold."
            at = AnchoredText(annText, prop=dict(size=30), frameon=True,
                              loc='upper left')
            at.patch.set_boxstyle("round,pad=0.,rounding_size=0.2")
            ax.add_artist(at)

            plt.show()

            if save_fig:
                # save the current figure
                fig3.savefig(f'./{newpath}/Hist_NormResiduals.pdf',
                             bbox_inches='tight')
                # Destroy the current plot
                plt.clf()

            # ------ Q-Q plot of the normalized residuals ------
            plt.figure()
            fig4 = qqplot(resid_pearson, line='45', fit='True')
            plt.xticks()
            plt.yticks()
            plt.xlabel("Theoretical quantiles")
            plt.ylabel("Sample quantiles")
            plt.title(key+": Q-Q plot of normalized residuals")
            plt.grid(True)
            plt.show()

            if save_fig:
                # save the current figure
                fig4.savefig(f'./{newpath}/QQPlot_NormResiduals.pdf',
                             bbox_inches='tight')
                # Destroy the current plot
                plt.clf()

    # -------------------------------------------------------------------------
    def eval_pce_model_3d(self, save_fig=True):

        self.n_samples = 1000

        PCEModel = self.MetaModel
        Model = self.MetaModel.ModelObj
        n_samples = self.n_samples

        # Create 3D-Grid
        # TODO: Make it general
        x = np.linspace(-5, 10, n_samples)
        y = np.linspace(0, 15, n_samples)

        X, Y = np.meshgrid(x, y)
        PCE_Z = np.zeros((self.n_samples, self.n_samples))
        Model_Z = np.zeros((self.n_samples, self.n_samples))

        for idxMesh in range(self.n_samples):
            sample_mesh = np.vstack((X[:, idxMesh], Y[:, idxMesh])).T

            univ_p_val = PCEModel.univ_basis_vals(sample_mesh)

            for Outkey, ValuesDict in PCEModel.coeffs_dict.items():

                pce_out_mean = np.zeros((len(sample_mesh), len(ValuesDict)))
                pce_out_std = np.zeros((len(sample_mesh), len(ValuesDict)))
                model_outs = np.zeros((len(sample_mesh), len(ValuesDict)))

                for Inkey, InIdxValues in ValuesDict.items():
                    idx = int(Inkey.split('_')[1]) - 1
                    basis_deg_ind = PCEModel.basis_dict[Outkey][Inkey]
                    clf_poly = PCEModel.clf_poly[Outkey][Inkey]

                    PSI_Val = PCEModel.create_psi(basis_deg_ind, univ_p_val)

                    # Perdiction with error bar
                    y_mean, y_std = clf_poly.predict(PSI_Val, return_std=True)

                    pce_out_mean[:, idx] = y_mean
                    pce_out_std[:, idx] = y_std

                    # Model evaluation
                    model_out_dict, _ = Model.run_model_parallel(sample_mesh,
                                                                 key_str='Valid3D')
                    model_outs[:, idx] = model_out_dict[Outkey].T

                PCE_Z[:, idxMesh] = y_mean
                Model_Z[:, idxMesh] = model_outs[:, 0]

        # ---------------- 3D plot for PCEModel -----------------------
        fig_PCE = plt.figure()
        ax = plt.axes(projection='3d')
        ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1,
                        cmap='viridis', edgecolor='none')
        ax.set_title('PCEModel')
        ax.set_xlabel('$x_1$')
        ax.set_ylabel('$x_2$')
        ax.set_zlabel('$f(x_1,x_2)$')

        plt.grid()
        plt.show()

        if save_fig:
            #  Saving the figure
            newpath = f'Outputs_PostProcessing_{self.name}/'
            if not os.path.exists(newpath):
                os.makedirs(newpath)

            # save the figure to file
            fig_PCE.savefig(f'./{newpath}/3DPlot_PCEModel.pdf', format="pdf",
                            bbox_inches='tight')
            plt.close(fig_PCE)

        # ---------------- 3D plot for Model -----------------------
        fig_Model = plt.figure()
        ax = plt.axes(projection='3d')
        ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1,
                        cmap='viridis', edgecolor='none')
        ax.set_title('Model')
        ax.set_xlabel('$x_1$')
        ax.set_ylabel('$x_2$')
        ax.set_zlabel('$f(x_1,x_2)$')

        plt.grid()
        plt.show()

        if save_fig:
            # Save the figure
            fig_Model.savefig(f'./{newpath}/3DPlot_Model.pdf', format="pdf",
                              bbox_inches='tight')
            plt.close(fig_Model)

        return

    # -------------------------------------------------------------------------
    def _compute_pce_moments(self):
        """
        Computes the first two moments using the PCE-based meta-model.

        Returns
        -------
        None.

        """

        MetaModel = self.MetaModel
        self.pce_means = {}
        self.pce_stds = {}

        for Outkey, ValuesDict in MetaModel.coeffs_dict.items():

            pce_mean = np.zeros((len(ValuesDict)))
            pce_var = np.zeros((len(ValuesDict)))

            for Inkey, InIdxValues in ValuesDict.items():
                idx = int(Inkey.split('_')[1]) - 1
                coeffs = MetaModel.coeffs_dict[Outkey][Inkey]

                # Mean = c_0
                if coeffs[0] != 0:
                    pce_mean[idx] = coeffs[0]
                else:
                    pce_mean[idx] = MetaModel.clf_poly[Outkey][Inkey].intercept_

                # Var = sum(coeffs[1:]**2)
                pce_var[idx] = np.sum(np.square(coeffs[1:]))

            # Back transformation if PCA is selected.
            if MetaModel.dim_red_method.lower() == 'pca':
                PCA = MetaModel.pca[Outkey]
                self.pce_means[Outkey] = PCA.mean_
                self.pce_means[Outkey] += np.dot(pce_mean, PCA.components_)
                self.pce_stds[Outkey] = np.sqrt(np.dot(pce_var,
                                                       PCA.components_**2))
            else:
                self.pce_means[Outkey] = pce_mean
                self.pce_stds[Outkey] = np.sqrt(pce_var)

            # Print a report table
            print("\n>>>>> Moments of {} <<<<<".format(Outkey))
            print("\nIndex  |  Mean   |  Std. deviation")
            print('-'*35)
            print('\n'.join(f'{i+1}  |  {k:.3e}  |  {j:.3e}' for i, (k, j)
                            in enumerate(zip(self.pce_means[Outkey],
                                             self.pce_stds[Outkey]))))
        print('-'*40)

    # -------------------------------------------------------------------------
    def _get_sample(self, n_samples=None):
        """
        Generates random samples taken from the input parameter space.

        Returns
        -------
        samples : array of shape (n_samples, n_params)
            Generated samples.

        """
        if n_samples is None:
            n_samples = self.n_samples
        PCEModel = self.MetaModel
        self.samples = PCEModel.ExpDesign.generate_samples(n_samples, 'random')
        return self.samples

    # -------------------------------------------------------------------------
    def _eval_model(self, samples=None, key_str='Valid'):
        """
        Evaluates Forward Model for the given number of self.samples or given
        samples.

        Parameters
        ----------
        samples : array of shape (n_samples, n_params), optional
            Samples to evaluate the model at. The default is None.
        key_str : str, optional
            Key string pass to the model. The default is 'Valid'.

        Returns
        -------
        model_outs : dict
            Dictionary of results.

        """
        Model = self.MetaModel.ModelObj

        if samples is None:
            samples = self._get_sample()
            self.samples = samples
        else:
            self.n_samples = len(samples)

        model_outs, _ = Model.run_model_parallel(samples, key_str=key_str)

        return model_outs

    # -------------------------------------------------------------------------
    def _plot_validation(self, save_fig=True):
        """
        Plots outputs for visual comparison of metamodel outputs with that of
        the (full) original model.

        Parameters
        ----------
        save_fig : bool, optional
            Save the plots. The default is True.

        Returns
        -------
        None.

        """
        PCEModel = self.MetaModel

        # get the samples
        x_val = self.samples
        y_pce_val = self.pce_out_mean
        y_val = self.model_out_dict

        # Open a pdf for the plots
        if save_fig:
            newpath = f'Outputs_PostProcessing_{self.name}/'
            if not os.path.exists(newpath):
                os.makedirs(newpath)

            # create a PdfPages object
            pdf1 = PdfPages(f'./{newpath}/Model_vs_PCEModel.pdf')

        fig = plt.figure()
        # Fit the data(train the model)
        for key in y_pce_val.keys():

            y_pce_val_ = y_pce_val[key]
            y_val_ = y_val[key]

            regression_model = LinearRegression()
            regression_model.fit(y_pce_val_, y_val_)

            # Predict
            x_new = np.linspace(np.min(y_pce_val_), np.max(y_val_), 100)
            y_predicted = regression_model.predict(x_new[:, np.newaxis])

            plt.scatter(y_pce_val_, y_val_, color='gold', linewidth=2)
            plt.plot(x_new, y_predicted, color='k')

            # Calculate the adjusted R_squared and RMSE
            # the total number of explanatory variables in the model
            # (not including the constant term)
            length_list = []
            for key, value in PCEModel.coeffs_dict[key].items():
                length_list.append(len(value))
            n_predictors = min(length_list)
            n_samples = x_val.shape[0]

            R2 = r2_score(y_pce_val_, y_val_)
            AdjR2 = 1 - (1 - R2) * (n_samples - 1) / \
                (n_samples - n_predictors - 1)
            rmse = mean_squared_error(y_pce_val_, y_val_, squared=False)

            plt.annotate(f'RMSE = {rmse:.3f}\n Adjusted $R^2$ = {AdjR2:.3f}',
                         xy=(0.05, 0.85), xycoords='axes fraction')

            plt.ylabel("Original Model")
            plt.xlabel("PCE Model")
            plt.grid()
            plt.show()

            if save_fig:
                # save the current figure
                pdf1.savefig(fig, bbox_inches='tight')

                # Destroy the current plot
                plt.clf()

        # Close the pdfs
        pdf1.close()

    # -------------------------------------------------------------------------
    def _plot_validation_multi(self, x_values=[], x_axis="x [m]", save_fig=True):
        """
        Plots outputs for visual comparison of metamodel outputs with that of
        the (full) multioutput original model

        Parameters
        ----------
        x_values : list or array, optional
            List of x values. The default is [].
        x_axis : str, optional
            Label of the x axis. The default is "x [m]".
        save_fig : bool, optional
            Whether to save the figures. The default is True.

        Returns
        -------
        None.

        """
        Model = self.MetaModel.ModelObj

        if save_fig:
            newpath = f'Outputs_PostProcessing_{self.name}/'
            if not os.path.exists(newpath):
                os.makedirs(newpath)

            # create a PdfPages object
            pdf = PdfPages(f'./{newpath}/Model_vs_PCEModel.pdf')

        # List of markers and colors
        color = cycle((['b', 'g', 'r', 'y', 'k']))
        marker = cycle(('x', 'd', '+', 'o', '*'))

        fig = plt.figure()
        # Plot the model vs PCE model
        for keyIdx, key in enumerate(Model.Output.names):

            y_pce_val = self.pce_out_mean[key]
            y_pce_val_std = self.pce_out_std[key]
            y_val = self.model_out_dict[key]
            try:
                x = self.model_out_dict['x_values'][key]
            except IndexError:
                x = x_values

            for idx in range(y_val.shape[0]):
                Color = next(color)
                Marker = next(marker)

                plt.plot(x, y_val[idx], color=Color, marker=Marker,
                         label='$Y_{%s}^M$'%(idx+1))

                plt.plot(x, y_pce_val[idx], color=Color, marker=Marker,
                         linestyle='--',
                         label='$Y_{%s}^{PCE}$'%(idx+1))
                plt.fill_between(x, y_pce_val[idx]-1.96*y_pce_val_std[idx],
                                 y_pce_val[idx]+1.96*y_pce_val_std[idx],
                                 color=Color, alpha=0.15)

            # Calculate the RMSE
            rmse = mean_squared_error(y_pce_val, y_val, squared=False)
            R2 = r2_score(y_pce_val[idx].reshape(-1, 1),
                          y_val[idx].reshape(-1, 1))

            plt.annotate(f'RMSE = {rmse:.3f}\n $R^2$ = {R2:.3f}',
                         xy=(0.2, 0.75), xycoords='axes fraction')

            plt.ylabel(key)
            plt.xlabel(x_axis)
            plt.legend(loc='best')
            plt.grid()

            if save_fig:
                # save the current figure
                pdf.savefig(fig, bbox_inches='tight')
                # Destroy the current plot
                plt.clf()

        pdf.close()

        # Zip the subdirectories
        Model.zip_subdirs(f'{Model.name}valid', f'{Model.name}valid_')

Methods

def plot_moments(self, xlabel='Time [s]', plot_type=None, save_fig=True)

Plots the moments in a pdf format in the directory Outputs_PostProcessing.

Parameters

xlabel : str, optional
String to be displayed as x-label. The default is 'Time [s]'.
plot_type : str, optional
Options: bar or line. The default is None.
save_fig : bool, optional
Save figure or not. The default is True.

Returns

pce_means : dict
Mean of the model outputs.
pce_means : dict
Standard deviation of the model outputs.
Expand source code
def plot_moments(self, xlabel='Time [s]', plot_type=None, save_fig=True):
    """
    Plots the moments in a pdf format in the directory
    `Outputs_PostProcessing`.

    Parameters
    ----------
    xlabel : str, optional
        String to be displayed as x-label. The default is `'Time [s]'`.
    plot_type : str, optional
        Options: bar or line. The default is `None`.
    save_fig : bool, optional
        Save figure or not. The default is `True`.

    Returns
    -------
    pce_means: dict
        Mean of the model outputs.
    pce_means: dict
        Standard deviation of the model outputs.

    """

    bar_plot = True if plot_type == 'bar' else False
    meta_model_type = self.MetaModel.meta_model_type
    Model = self.MetaModel.ModelObj

    # Read Monte-Carlo reference
    self.mc_reference = Model.read_mc_reference()
    print(self.mc_reference)

    # Set the x values
    x_values_orig = self.MetaModel.ExpDesign.x_values

    # Compute the moments with the PCEModel object
    self._compute_pce_moments()

    # Get the variables
    out_names = Model.Output.names

    # Open a pdf for the plots
    if save_fig:
        newpath = (f'Outputs_PostProcessing_{self.name}/')
        if not os.path.exists(newpath):
            os.makedirs(newpath)

        # create a PdfPages object
        pdf = PdfPages(f'./{newpath}Mean_Std_PCE.pdf')

    # Plot the best fit line, set the linewidth (lw), color and
    # transparency (alpha) of the line
    for idx, key in enumerate(out_names):
        fig, ax = plt.subplots(nrows=1, ncols=2)

        # Extract mean and std
        mean_data = self.pce_means[key]
        std_data = self.pce_stds[key]

        # Extract a list of x values
        if type(x_values_orig) is dict:
            x = x_values_orig[key]
        else:
            x = x_values_orig

        # Plot: bar plot or line plot
        if bar_plot:
            ax[0].bar(list(map(str, x)), mean_data, color='b',
                      width=0.25)
            ax[1].bar(list(map(str, x)), std_data, color='b',
                      width=0.25)
            ax[0].legend(labels=[meta_model_type])
            ax[1].legend(labels=[meta_model_type])
        else:
            ax[0].plot(x, mean_data, lw=3, color='k', marker='x',
                       label=meta_model_type)
            ax[1].plot(x, std_data, lw=3, color='k', marker='x',
                       label=meta_model_type)

        if self.mc_reference is not None:
            if bar_plot:
                ax[0].bar(list(map(str, x)), self.mc_reference['mean'],
                          color='r', width=0.25)
                ax[1].bar(list(map(str, x)), self.mc_reference['std'],
                          color='r', width=0.25)
                ax[0].legend(labels=[meta_model_type])
                ax[1].legend(labels=[meta_model_type])
            else:
                ax[0].plot(x, self.mc_reference['mean'], lw=3, marker='x',
                           color='r', label='Ref.')
                ax[1].plot(x, self.mc_reference['std'], lw=3, marker='x',
                           color='r', label='Ref.')

        # Label the axes and provide a title
        ax[0].set_xlabel(xlabel)
        ax[1].set_xlabel(xlabel)
        ax[0].set_ylabel(key)
        ax[1].set_ylabel(key)

        # Provide a title
        ax[0].set_title('Mean of ' + key)
        ax[1].set_title('Std of ' + key)

        if not bar_plot:
            ax[0].legend(loc='best')
            ax[1].legend(loc='best')

        plt.tight_layout()

        if save_fig:
            # save the current figure
            pdf.savefig(fig, bbox_inches='tight')

            # Destroy the current plot
            plt.clf()

    pdf.close()

    return self.pce_means, self.pce_stds
def valid_metamodel(self, n_samples=1, samples=None, x_axis='Time [s]')

Evaluates and plots the meta model and the PCEModel outputs for the given number of samples or the given samples.

Parameters

n_samples : int, optional
Number of samples to be evaluated. The default is 1.
samples : array of shape (n_samples, n_params), optional
Samples to be evaluated. The default is None.
x_axis : str, optional
Label of x axis. The default is 'Time [s]'.

Returns

None.

Expand source code
def valid_metamodel(self, n_samples=1, samples=None, x_axis='Time [s]'):
    """
    Evaluates and plots the meta model and the PCEModel outputs for the
    given number of samples or the given samples.

    Parameters
    ----------
    n_samples : int, optional
        Number of samples to be evaluated. The default is 1.
    samples : array of shape (n_samples, n_params), optional
        Samples to be evaluated. The default is None.
    x_axis : str, optional
        Label of x axis. The default is `'Time [s]'`.

    Returns
    -------
    None.

    """
    MetaModel = self.MetaModel
    Model = MetaModel.ModelObj

    if samples is None:
        self.n_samples = n_samples
        samples = self._get_sample()
    else:
        self.n_samples = samples.shape[0]

    # Extract x_values
    x_values = MetaModel.ExpDesign.x_values

    self.model_out_dict = self._eval_model(samples, key_str='valid')
    self.pce_out_mean, self.pce_out_std = MetaModel.eval_metamodel(samples)

    try:
        key = Model.Output.names[1]
    except IndexError:
        key = Model.Output.names[0]

    n_obs = self.model_out_dict[key].shape[1]

    if n_obs == 1:
        self._plot_validation()
    else:
        self._plot_validation_multi(x_values=x_values, x_axis=x_axis)
def check_accuracy(self, n_samples=None, samples=None, outputs=None)

Checks accuracy of the metamodel by computing the root mean square error and validation error for all outputs.

Parameters

n_samples : int, optional
Number of samples. The default is None.
samples : array of shape (n_samples, n_params), optional
Parameter sets to be checked. The default is None.
outputs : dict, optional
Output dictionary with model outputs for all given output types in Model.Output.names. The default is None.

Raises

Exception
When neither n_samples nor samples are provided.

Returns

rmse : dict
Root mean squared error for each output.
valid_error : dict
Validation error for each output.
Expand source code
def check_accuracy(self, n_samples=None, samples=None, outputs=None):
    """
    Checks accuracy of the metamodel by computing the root mean square
    error and validation error for all outputs.

    Parameters
    ----------
    n_samples : int, optional
        Number of samples. The default is None.
    samples : array of shape (n_samples, n_params), optional
        Parameter sets to be checked. The default is None.
    outputs : dict, optional
        Output dictionary with model outputs for all given output types in
        `Model.Output.names`. The default is None.

    Raises
    ------
    Exception
        When neither n_samples nor samples are provided.

    Returns
    -------
    rmse: dict
        Root mean squared error for each output.
    valid_error : dict
        Validation error for each output.

    """
    MetaModel = self.MetaModel
    Model = MetaModel.ModelObj

    # Set the number of samples
    if n_samples:
        self.n_samples = n_samples
    elif samples is not None:
        self.n_samples = samples.shape[0]
    else:
        raise Exception("Please provide either samples or pass number of "
                        "samples!")

    # Generate random samples if necessary
    Samples = self._get_sample() if samples is None else samples

    # Run the original model with the generated samples
    if outputs is None:
        outputs = self._eval_model(Samples, key_str='validSet')

    # Run the PCE model with the generated samples
    pce_outputs, _ = MetaModel.eval_metamodel(samples=Samples)

    self.rmse = {}
    self.valid_error = {}
    # Loop over the keys and compute RMSE error.
    for key in Model.Output.names:
        # Root mena square
        self.rmse[key] = mean_squared_error(outputs[key], pce_outputs[key],
                                            squared=False,
                                            multioutput='raw_values')
        # Validation error
        self.valid_error[key] = (self.rmse[key]**2 / self.n_samples) / \
            np.var(outputs[key], ddof=1, axis=0)

        # Print a report table
        print("\n>>>>> Errors of {} <<<<<".format(key))
        print("\nIndex  |  RMSE   |  Validation Error")
        print('-'*35)
        print('\n'.join(f'{i+1}  |  {k:.3e}  |  {j:.3e}' for i, (k, j)
                        in enumerate(zip(self.rmse[key],
                                         self.valid_error[key]))))
    # Save error dicts in PCEModel object
    self.MetaModel.rmse = self.rmse
    self.MetaModel.valid_error = self.valid_error

    return self.rmse, self.valid_error
def plot_seq_design_diagnostics(self, ref_BME_KLD=None, save_fig=True)

Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence (KLD) for the sequential design.

Parameters

ref_BME_KLD : array, optional
Reference BME and KLD . The default is None.
save_fig : bool, optional
Whether to save the figures. The default is True.

Returns

None.

Expand source code
def plot_seq_design_diagnostics(self, ref_BME_KLD=None, save_fig=True):
    """
    Plots the Bayesian Model Evidence (BME) and Kullback-Leibler divergence
    (KLD) for the sequential design.

    Parameters
    ----------
    ref_BME_KLD : array, optional
        Reference BME and KLD . The default is `None`.
    save_fig : bool, optional
        Whether to save the figures. The default is `True`.

    Returns
    -------
    None.

    """
    PCEModel = self.MetaModel
    n_init_samples = PCEModel.ExpDesign.n_init_samples
    n_total_samples = PCEModel.ExpDesign.X.shape[0]

    if save_fig:
        newpath = f'Outputs_PostProcessing_{self.name}/'
        if not os.path.exists(newpath):
            os.makedirs(newpath)

        # create a PdfPages object
        pdf = PdfPages(f'./{newpath}/seqPCEModelDiagnostics.pdf')

    plotList = ['Modified LOO error', 'Validation error', 'KLD', 'BME',
                'RMSEMean', 'RMSEStd', 'Hellinger distance']
    seqList = [PCEModel.SeqModifiedLOO, PCEModel.seqValidError,
               PCEModel.SeqKLD, PCEModel.SeqBME, PCEModel.seqRMSEMean,
               PCEModel.seqRMSEStd, PCEModel.SeqDistHellinger]

    markers = ('x', 'o', 'd', '*', '+')
    colors = ('k', 'darkgreen', 'b', 'navy', 'darkred')

    # Plot the evolution of the diagnostic criteria of the
    # Sequential Experimental Design.
    for plotidx, plot in enumerate(plotList):
        fig, ax = plt.subplots()
        seq_dict = seqList[plotidx]
        name_util = list(seq_dict.keys())

        if len(name_util) == 0:
            continue

        # Box plot when Replications have been detected.
        if any(int(name.split("rep_", 1)[1]) > 1 for name in name_util):
            # Extract the values from dict
            sorted_seq_opt = {}
            # Number of replications
            n_reps = PCEModel.ExpDesign.nReprications

            # Get the list of utility function names
            # Handle if only one UtilityFunction is provided
            if not isinstance(PCEModel.ExpDesign.UtilityFunction, list):
                util_funcs = [PCEModel.ExpDesign.UtilityFunction]
            else:
                util_funcs = PCEModel.ExpDesign.UtilityFunction

            for util in util_funcs:
                sortedSeq = {}
                # min number of runs available from reps
                n_runs = min([seq_dict[f'{util}_rep_{i+1}'].shape[0]
                             for i in range(n_reps)])

                for runIdx in range(n_runs):
                    values = []
                    for key in seq_dict.keys():
                        if util in key:
                            values.append(seq_dict[key][runIdx].mean())
                    sortedSeq['SeqItr_'+str(runIdx)] = np.array(values)
                sorted_seq_opt[util] = sortedSeq

            # BoxPlot
            def draw_plot(data, labels, edge_color, fill_color, idx):
                pos = labels - (idx-1)
                bp = plt.boxplot(data, positions=pos, labels=labels,
                                 patch_artist=True, sym='', widths=0.75)
                elements = ['boxes', 'whiskers', 'fliers', 'means',
                            'medians', 'caps']
                for element in elements:
                    plt.setp(bp[element], color=edge_color[idx])

                for patch in bp['boxes']:
                    patch.set(facecolor=fill_color[idx])

            if PCEModel.ExpDesign.n_new_samples != 1:
                step1 = PCEModel.ExpDesign.n_new_samples
                step2 = 1
            else:
                step1 = 5
                step2 = 5
            edge_color = ['red', 'blue', 'green']
            fill_color = ['tan', 'cyan', 'lightgreen']
            plot_label = plot
            # Plot for different Utility Functions
            for idx, util in enumerate(util_funcs):
                all_errors = np.empty((n_reps, 0))

                for key in list(sorted_seq_opt[util].keys()):
                    errors = sorted_seq_opt.get(util, {}).get(key)[:, None]
                    all_errors = np.hstack((all_errors, errors))

                # Special cases for BME and KLD
                if plot == 'KLD' or plot == 'BME':
                    # BME convergence if refBME is provided
                    if ref_BME_KLD is not None:
                        if plot == 'BME':
                            refValue = ref_BME_KLD[0]
                            plot_label = r'$BME/BME^{Ref.}$'
                        if plot == 'KLD':
                            refValue = ref_BME_KLD[1]
                            plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\
                                ' / D_{KL}^{Ref.}[p(\theta|y_*), '\
                                'p(\theta)]$'

                        # Difference between BME/KLD and the ref. values
                        all_errors = np.divide(all_errors,
                                               np.full((all_errors.shape),
                                                       refValue))

                        # Plot baseline for zero, i.e. no difference
                        plt.axhline(y=1.0, xmin=0, xmax=1, c='green',
                                    ls='--', lw=2)

                # Plot each UtilFuncs
                labels = np.arange(n_init_samples, n_total_samples+1, step1)
                draw_plot(all_errors[:, ::step2], labels, edge_color,
                          fill_color, idx)

            plt.xticks(labels, labels)
            # Set the major and minor locators
            ax.xaxis.set_major_locator(ticker.AutoLocator())
            ax.xaxis.set_minor_locator(ticker.AutoMinorLocator())
            ax.xaxis.grid(True, which='major', linestyle='-')
            ax.xaxis.grid(True, which='minor', linestyle='--')

            # Legend
            legend_elements = []
            for idx, util in enumerate(util_funcs):
                legend_elements.append(Patch(facecolor=fill_color[idx],
                                             edgecolor=edge_color[idx],
                                             label=util))
            plt.legend(handles=legend_elements[::-1], loc='best')

            if plot != 'BME' and plot != 'KLD':
                plt.yscale('log')
            plt.autoscale(True)
            plt.xlabel('\\# of training samples')
            plt.ylabel(plot_label)
            plt.title(plot)

            if save_fig:
                # save the current figure
                pdf.savefig(fig, bbox_inches='tight')
                # Destroy the current plot
                plt.clf()
                # Save arrays into files
                f = open(f'./{newpath}/Seq{plot}.txt', 'w')
                f.write(str(sorted_seq_opt))
                f.close()
        else:
            for idx, name in enumerate(name_util):
                seq_values = seq_dict[name]
                if PCEModel.ExpDesign.n_new_samples != 1:
                    step = PCEModel.ExpDesign.n_new_samples
                else:
                    step = 1
                x_idx = np.arange(n_init_samples, n_total_samples+1, step)
                if n_total_samples not in x_idx:
                    x_idx = np.hstack((x_idx, n_total_samples))

                if plot == 'KLD' or plot == 'BME':
                    # BME convergence if refBME is provided
                    if ref_BME_KLD is not None:
                        if plot == 'BME':
                            refValue = ref_BME_KLD[0]
                            plot_label = r'$BME/BME^{Ref.}$'
                        if plot == 'KLD':
                            refValue = ref_BME_KLD[1]
                            plot_label = '$D_{KL}[p(\theta|y_*),p(\theta)]'\
                                ' / D_{KL}^{Ref.}[p(\theta|y_*), '\
                                'p(\theta)]$'

                        # Difference between BME/KLD and the ref. values
                        values = np.divide(seq_values,
                                           np.full((seq_values.shape),
                                                   refValue))

                        # Plot baseline for zero, i.e. no difference
                        plt.axhline(y=1.0, xmin=0, xmax=1, c='green',
                                    ls='--', lw=2)

                        # Set the limits
                        plt.ylim([1e-1, 1e1])

                        # Create the plots
                        plt.semilogy(x_idx, values, marker=markers[idx],
                                     color=colors[idx], ls='--', lw=2,
                                     label=name.split("_rep", 1)[0])
                    else:
                        plot_label = plot

                        # Create the plots
                        plt.plot(x_idx, seq_values, marker=markers[idx],
                                 color=colors[idx], ls='--', lw=2,
                                 label=name.split("_rep", 1)[0])

                else:
                    plot_label = plot
                    seq_values = np.nan_to_num(seq_values)

                    # Plot the error evolution for each output
                    for i in range(seq_values.shape[1]):
                        plt.semilogy(x_idx, seq_values[:, i], ls='--',
                                     lw=2, marker=markers[idx],
                                     color=colors[idx], alpha=0.15)

                    plt.semilogy(x_idx, seq_values, marker=markers[idx],
                                 ls='--', lw=2, color=colors[idx],
                                 label=name.split("_rep", 1)[0])

            # Set the major and minor locators
            ax.xaxis.set_major_locator(ticker.AutoLocator())
            ax.xaxis.set_minor_locator(ticker.AutoMinorLocator())
            ax.xaxis.grid(True, which='major', linestyle='-')
            ax.xaxis.grid(True, which='minor', linestyle='--')

            ax.tick_params(axis='both', which='major', direction='in',
                           width=3, length=10)
            ax.tick_params(axis='both', which='minor', direction='in',
                           width=2, length=8)
            plt.xlabel('Number of runs')
            plt.ylabel(plot_label)
            plt.title(plot)
            plt.legend(frameon=True)

            if save_fig:
                # save the current figure
                pdf.savefig(fig, bbox_inches='tight')
                # Destroy the current plot
                plt.clf()

                # ---------------- Saving arrays into files ---------------
                np.save(f'./{newpath}/Seq{plot}.npy', seq_values)

    # Close the pdf
    pdf.close()
    return
def sobol_indices(self, xlabel='Time [s]', plot_type=None, save_fig=True)

Provides Sobol indices as a sensitivity measure to infer the importance of the input parameters. See Eq. 27 in [1] for more details. For the case with Principal component analysis refer to [2].

[1] Global sensitivity analysis: A flexible and efficient framework with an example from stochastic hydrogeology S. Oladyshkin, F.P. de Barros, W. Nowak https://doi.org/10.1016/j.advwatres.2011.11.001

[2] Nagel, J.B., Rieckermann, J. and Sudret, B., 2020. Principal component analysis and sparse polynomial chaos expansions for global sensitivity analysis and model calibration: Application to urban drainage simulation. Reliability Engineering & System Safety, 195, p.106737.

Parameters

xlabel : str, optional
Label of the x-axis. The default is 'Time [s]'.
plot_type : str, optional
Plot type. The default is None. This corresponds to line plot. Bar chart can be selected by bar.
save_fig : bool, optional
Whether to save the figures. The default is True.

Returns

sobol_cell : dict
Sobol indices.
total_sobol : dict
Total Sobol indices.
Expand source code
def sobol_indices(self, xlabel='Time [s]', plot_type=None, save_fig=True):
    """
    Provides Sobol indices as a sensitivity measure to infer the importance
    of the input parameters. See Eq. 27 in [1] for more details. For the
    case with Principal component analysis refer to [2].

    [1] Global sensitivity analysis: A flexible and efficient framework
    with an example from stochastic hydrogeology S. Oladyshkin, F.P.
    de Barros, W. Nowak  https://doi.org/10.1016/j.advwatres.2011.11.001

    [2] Nagel, J.B., Rieckermann, J. and Sudret, B., 2020. Principal
    component analysis and sparse polynomial chaos expansions for global
    sensitivity analysis and model calibration: Application to urban
    drainage simulation. Reliability Engineering & System Safety, 195,
    p.106737.

    Parameters
    ----------
    xlabel : str, optional
        Label of the x-axis. The default is `'Time [s]'`.
    plot_type : str, optional
        Plot type. The default is `None`. This corresponds to line plot.
        Bar chart can be selected by `bar`.
    save_fig : bool, optional
        Whether to save the figures. The default is `True`.

    Returns
    -------
    sobol_cell: dict
        Sobol indices.
    total_sobol: dict
        Total Sobol indices.

    """
    # Extract the necessary variables
    PCEModel = self.MetaModel
    basis_dict = PCEModel.basis_dict
    coeffs_dict = PCEModel.coeffs_dict
    n_params = PCEModel.n_params
    max_order = np.max(PCEModel.pce_deg)
    self.sobol_cell = {}
    self.total_sobol = {}

    for Output in PCEModel.ModelObj.Output.names:

        n_meas_points = len(coeffs_dict[Output])

        # Initialize the (cell) array containing the (total) Sobol indices.
        sobol_array = dict.fromkeys(range(1, max_order+1), [])
        sobol_cell_array = dict.fromkeys(range(1, max_order+1), [])

        for i_order in range(1, max_order+1):
            n_comb = math.comb(n_params, i_order)

            sobol_cell_array[i_order] = np.zeros((n_comb, n_meas_points))

        total_sobol_array = np.zeros((n_params, n_meas_points))

        # Initialize the cell to store the names of the variables
        TotalVariance = np.zeros((n_meas_points))

        # Loop over all measurement points and calculate sobol indices
        for pIdx in range(n_meas_points):

            # Extract the basis indices (alpha) and coefficients
            Basis = basis_dict[Output][f'y_{pIdx+1}']

            try:
                clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+1}']
                PCECoeffs = clf_poly.coef_
            except:
                PCECoeffs = coeffs_dict[Output][f'y_{pIdx+1}']

            # Compute total variance
            TotalVariance[pIdx] = np.sum(np.square(PCECoeffs[1:]))

            nzidx = np.where(PCECoeffs != 0)[0]
            # Set all the Sobol indices equal to zero in the presence of a
            # null output.
            if len(nzidx) == 0:
                # This is buggy.
                for i_order in range(1, max_order+1):
                    sobol_cell_array[i_order][:, pIdx] = 0

            # Otherwise compute them by summing well-chosen coefficients
            else:
                nz_basis = Basis[nzidx]
                for i_order in range(1, max_order+1):
                    idx = np.where(np.sum(nz_basis > 0, axis=1) == i_order)
                    subbasis = nz_basis[idx]
                    Z = np.array(list(combinations(range(n_params), i_order)))

                    for q in range(Z.shape[0]):
                        Zq = Z[q]
                        subsubbasis = subbasis[:, Zq]
                        subidx = np.prod(subsubbasis, axis=1) > 0
                        sum_ind = nzidx[idx[0][subidx]]
                        if TotalVariance[pIdx] == 0.0:
                            sobol_cell_array[i_order][q, pIdx] = 0.0
                        else:
                            sobol = np.sum(np.square(PCECoeffs[sum_ind]))
                            sobol /= TotalVariance[pIdx]
                            sobol_cell_array[i_order][q, pIdx] = sobol

                # Compute the TOTAL Sobol indices.
                for ParIdx in range(n_params):
                    idx = nz_basis[:, ParIdx] > 0
                    sum_ind = nzidx[idx]

                    if TotalVariance[pIdx] == 0.0:
                        total_sobol_array[ParIdx, pIdx] = 0.0
                    else:
                        sobol = np.sum(np.square(PCECoeffs[sum_ind]))
                        sobol /= TotalVariance[pIdx]
                        total_sobol_array[ParIdx, pIdx] = sobol

            # ----- if PCA selected: Compute covariance -----
            if PCEModel.dim_red_method.lower() == 'pca':
                cov_Z_p_q = np.zeros((n_params))
                # Extract the basis indices (alpha) and coefficients for 
                # next component
                if pIdx < n_meas_points-1:
                    nextBasis = basis_dict[Output][f'y_{pIdx+2}']

                    try:
                        clf_poly = PCEModel.clf_poly[Output][f'y_{pIdx+2}']
                        nextPCECoeffs = clf_poly.coef_
                    except:
                        nextPCECoeffs = coeffs_dict[Output][f'y_{pIdx+2}']

                    # Choose the common non-zero basis
                    mask = (Basis[:, None] == nextBasis).all(-1).any(-1)
                    similar_basis = Basis[mask]
                    # Compute the TOTAL Sobol indices.
                    for ParIdx in range(n_params):
                        idx = similar_basis[:, ParIdx] > 0
                        try:
                            sum_is = nzidx[idx]
                            cov_Z_p_q[ParIdx] = np.sum(PCECoeffs[sum_ind] *
                                                       nextPCECoeffs[sum_is])
                        except:
                            cov_Z_p_q[ParIdx] = 0.0

        # Compute the sobol indices according to Ref. 2
        if PCEModel.dim_red_method.lower() == 'pca':
            n_c_points = PCEModel.ExpDesign.Y[Output].shape[1]
            PCA = PCEModel.pca[Output]
            compPCA = PCA.components_
            nComp = compPCA.shape[0]
            var_Z_p = PCA.explained_variance_

            # Extract the sobol index of the components
            for i_order in range(1, max_order+1):
                n_comb = math.comb(n_params, i_order)
                sobol_array[i_order] = np.zeros((n_comb, n_c_points))
                Z = np.array(list(combinations(range(n_params), i_order)))

                for q in range(Z.shape[0]):
                    S_Z_i = sobol_cell_array[i_order][q]

                    for tIdx in range(n_c_points):
                        var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx])
                        if var_Y_t == 0.0:
                            term1, term2 = 0.0, 0.0
                        else:
                            term1 = np.sum([S_Z_i[i]*(var_Z_p[i]*(compPCA[i, tIdx]**2)/var_Y_t) for i in range(nComp)])

                            # Term 2
                            # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs
                            Phi_t_p = compPCA[:nComp-1]
                            Phi_t_q = compPCA
                            term2 = 2 * np.sum([cov_Z_p_q[ParIdx] * Phi_t_p[i,tIdx] * Phi_t_q[i,tIdx]/var_Y_t for i in range(nComp-1)])

                        sobol_array[i_order][q, tIdx] = term1 #+ term2

            # Compute the TOTAL Sobol indices.
            total_sobol = np.zeros((n_params, n_c_points))
            for ParIdx in range(n_params):
                S_Z_i = total_sobol_array[ParIdx]

                for tIdx in range(n_c_points):
                    var_Y_t = np.var(PCEModel.ExpDesign.Y[Output][:, tIdx])
                    if var_Y_t == 0.0:
                        term1, term2 = 0.0, 0.0
                    else:
                        term1 = 0
                        for i in range(nComp):
                            term1 += S_Z_i[i] * var_Z_p[i] * \
                                (compPCA[i, tIdx]**2) / var_Y_t

                        # Term 2
                        # cov_Z_p_q = np.ones((nComp))# TODO: from coeffs
                        Phi_t_p = compPCA[:nComp-1]
                        Phi_t_q = compPCA
                        term2 = 0
                        for i in range(nComp-1):
                            term2 += cov_Z_p_q[ParIdx] * Phi_t_p[i, tIdx] \
                                * Phi_t_q[i, tIdx] / var_Y_t
                        term2 *= 2

                    total_sobol[ParIdx, tIdx] = term1 + term2

            self.sobol_cell[Output] = sobol_array
            self.total_sobol[Output] = total_sobol
        else:
            self.sobol_cell[Output] = sobol_cell_array
            self.total_sobol[Output] = total_sobol_array

    # ---------------- Plot -----------------------
    par_names = PCEModel.ExpDesign.par_names
    x_values_orig = PCEModel.ExpDesign.x_values

    cases = ['']

    for case in cases:
        newpath = (f'Outputs_PostProcessing_{self.name}/')
        if not os.path.exists(newpath):
            os.makedirs(newpath)

        if save_fig:
            # create a PdfPages object
            name = case+'_' if 'Valid' in cases else ''
            pdf = PdfPages('./'+newpath+name+'Sobol_indices.pdf')

        fig = plt.figure()

        for outIdx, Output in enumerate(PCEModel.ModelObj.Output.names):

            # Extract total Sobol indices
            total_sobol = self.total_sobol[Output]

            # Extract a list of x values
            if type(x_values_orig) is dict:
                x = x_values_orig[Output]
            else:
                x = x_values_orig

            if plot_type == 'bar':
                ax = fig.add_axes([0, 0, 1, 1])
                dict1 = {xlabel: x}
                dict2 = {param: sobolIndices for param, sobolIndices
                         in zip(par_names, total_sobol)}

                df = pd.DataFrame({**dict1, **dict2})
                df.plot(x=xlabel, y=par_names, kind="bar", ax=ax, rot=0,
                        colormap='Dark2')
                ax.set_ylabel('Total Sobol indices, $S^T$')

            else:
                for i, sobolIndices in enumerate(total_sobol):
                    plt.plot(x, sobolIndices, label=par_names[i],
                             marker='x', lw=2.5)

                plt.ylabel('Total Sobol indices, $S^T$')
                plt.xlabel(xlabel)

            plt.title(f'Sensitivity analysis of {Output}')
            if plot_type != 'bar':
                plt.legend(loc='best', frameon=True)

            # Save indices
            np.savetxt(f'./{newpath}{name}totalsobol_' +
                       Output.replace('/', '_') + '.csv',
                       total_sobol.T, delimiter=',',
                       header=','.join(par_names), comments='')

            if save_fig:
                # save the current figure
                pdf.savefig(fig, bbox_inches='tight')

                # Destroy the current plot
                plt.clf()

        pdf.close()

    return self.sobol_cell, self.total_sobol
def check_reg_quality(self, n_samples=1000, samples=None, save_fig=True)

Checks the quality of the metamodel for single output models based on: https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685

Parameters

n_samples : int, optional
Number of parameter sets to use for the check. The default is 1000.
samples : array of shape (n_samples, n_params), optional
Parameter sets to use for the check. The default is None.
save_fig : bool, optional
Whether to save the figures. The default is True.

Returns

None.

Expand source code
def check_reg_quality(self, n_samples=1000, samples=None, save_fig=True):
    """
    Checks the quality of the metamodel for single output models based on:
    https://towardsdatascience.com/how-do-you-check-the-quality-of-your-regression-model-in-python-fa61759ff685


    Parameters
    ----------
    n_samples : int, optional
        Number of parameter sets to use for the check. The default is 1000.
    samples : array of shape (n_samples, n_params), optional
        Parameter sets to use for the check. The default is None.
    save_fig : bool, optional
        Whether to save the figures. The default is True.

    Returns
    -------
    None.

    """
    MetaModel = self.MetaModel

    if samples is None:
        self.n_samples = n_samples
        samples = self._get_sample()
    else:
        self.n_samples = samples.shape[0]

    # Evaluate the original and the surrogate model
    y_val = self._eval_model(samples, key_str='valid')
    y_pce_val, _ = MetaModel.eval_metamodel(samples=samples)

    # Open a pdf for the plots
    if save_fig:
        newpath = (r'Outputs_PostProcessing_{0}/'.format(self.name))
        if not os.path.exists(newpath):
            os.makedirs(newpath)

    # Fit the data(train the model)
    for key in y_pce_val.keys():

        y_pce_val_ = y_pce_val[key]
        y_val_ = y_val[key]

        # ------ Residuals vs. predicting variables ------
        # Check the assumptions of linearity and independence
        fig1 = plt.figure()
        plt.title(key+": Residuals vs. Predicting variables")
        residuals = y_val_ - y_pce_val_
        plt.scatter(x=y_val_, y=residuals, color='blue', edgecolor='k')
        plt.grid(True)
        xmin, xmax = min(y_val_), max(y_val_)
        plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3,
                   linestyle='--')
        plt.xlabel(key)
        plt.ylabel('Residuals')
        plt.show()

        if save_fig:
            # save the current figure
            fig1.savefig(f'./{newpath}/Residuals_vs_PredVariables.pdf',
                         bbox_inches='tight')
            # Destroy the current plot
            plt.clf()

        # ------ Fitted vs. residuals ------
        # Check the assumptions of linearity and independence
        fig2 = plt.figure()
        plt.title(key+": Residuals vs. predicting variables")
        plt.scatter(x=y_pce_val_, y=residuals, color='blue', edgecolor='k')
        plt.grid(True)
        xmin, xmax = min(y_val_), max(y_val_)
        plt.hlines(y=0, xmin=xmin*0.9, xmax=xmax*1.1, color='red', lw=3,
                   linestyle='--')
        plt.xlabel(key)
        plt.ylabel('Residuals')
        plt.show()

        if save_fig:
            # save the current figure
            fig2.savefig(f'./{newpath}/Fitted_vs_Residuals.pdf',
                         bbox_inches='tight')
            # Destroy the current plot
            plt.clf()

        # ------ Histogram of normalized residuals ------
        fig3 = plt.figure()
        resid_pearson = residuals / (max(residuals)-min(residuals))
        plt.hist(resid_pearson, bins=20, edgecolor='k')
        plt.ylabel('Count')
        plt.xlabel('Normalized residuals')
        plt.title(f"{key}: Histogram of normalized residuals")

        # Normality (Shapiro-Wilk) test of the residuals
        ax = plt.gca()
        _, p = stats.shapiro(residuals)
        if p < 0.01:
            annText = "The residuals seem to come from Gaussian process."
        else:
            annText = "The normality assumption may not hold."
        at = AnchoredText(annText, prop=dict(size=30), frameon=True,
                          loc='upper left')
        at.patch.set_boxstyle("round,pad=0.,rounding_size=0.2")
        ax.add_artist(at)

        plt.show()

        if save_fig:
            # save the current figure
            fig3.savefig(f'./{newpath}/Hist_NormResiduals.pdf',
                         bbox_inches='tight')
            # Destroy the current plot
            plt.clf()

        # ------ Q-Q plot of the normalized residuals ------
        plt.figure()
        fig4 = qqplot(resid_pearson, line='45', fit='True')
        plt.xticks()
        plt.yticks()
        plt.xlabel("Theoretical quantiles")
        plt.ylabel("Sample quantiles")
        plt.title(key+": Q-Q plot of normalized residuals")
        plt.grid(True)
        plt.show()

        if save_fig:
            # save the current figure
            fig4.savefig(f'./{newpath}/QQPlot_NormResiduals.pdf',
                         bbox_inches='tight')
            # Destroy the current plot
            plt.clf()
def eval_pce_model_3d(self, save_fig=True)
Expand source code
def eval_pce_model_3d(self, save_fig=True):

    self.n_samples = 1000

    PCEModel = self.MetaModel
    Model = self.MetaModel.ModelObj
    n_samples = self.n_samples

    # Create 3D-Grid
    # TODO: Make it general
    x = np.linspace(-5, 10, n_samples)
    y = np.linspace(0, 15, n_samples)

    X, Y = np.meshgrid(x, y)
    PCE_Z = np.zeros((self.n_samples, self.n_samples))
    Model_Z = np.zeros((self.n_samples, self.n_samples))

    for idxMesh in range(self.n_samples):
        sample_mesh = np.vstack((X[:, idxMesh], Y[:, idxMesh])).T

        univ_p_val = PCEModel.univ_basis_vals(sample_mesh)

        for Outkey, ValuesDict in PCEModel.coeffs_dict.items():

            pce_out_mean = np.zeros((len(sample_mesh), len(ValuesDict)))
            pce_out_std = np.zeros((len(sample_mesh), len(ValuesDict)))
            model_outs = np.zeros((len(sample_mesh), len(ValuesDict)))

            for Inkey, InIdxValues in ValuesDict.items():
                idx = int(Inkey.split('_')[1]) - 1
                basis_deg_ind = PCEModel.basis_dict[Outkey][Inkey]
                clf_poly = PCEModel.clf_poly[Outkey][Inkey]

                PSI_Val = PCEModel.create_psi(basis_deg_ind, univ_p_val)

                # Perdiction with error bar
                y_mean, y_std = clf_poly.predict(PSI_Val, return_std=True)

                pce_out_mean[:, idx] = y_mean
                pce_out_std[:, idx] = y_std

                # Model evaluation
                model_out_dict, _ = Model.run_model_parallel(sample_mesh,
                                                             key_str='Valid3D')
                model_outs[:, idx] = model_out_dict[Outkey].T

            PCE_Z[:, idxMesh] = y_mean
            Model_Z[:, idxMesh] = model_outs[:, 0]

    # ---------------- 3D plot for PCEModel -----------------------
    fig_PCE = plt.figure()
    ax = plt.axes(projection='3d')
    ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1,
                    cmap='viridis', edgecolor='none')
    ax.set_title('PCEModel')
    ax.set_xlabel('$x_1$')
    ax.set_ylabel('$x_2$')
    ax.set_zlabel('$f(x_1,x_2)$')

    plt.grid()
    plt.show()

    if save_fig:
        #  Saving the figure
        newpath = f'Outputs_PostProcessing_{self.name}/'
        if not os.path.exists(newpath):
            os.makedirs(newpath)

        # save the figure to file
        fig_PCE.savefig(f'./{newpath}/3DPlot_PCEModel.pdf', format="pdf",
                        bbox_inches='tight')
        plt.close(fig_PCE)

    # ---------------- 3D plot for Model -----------------------
    fig_Model = plt.figure()
    ax = plt.axes(projection='3d')
    ax.plot_surface(X, Y, PCE_Z, rstride=1, cstride=1,
                    cmap='viridis', edgecolor='none')
    ax.set_title('Model')
    ax.set_xlabel('$x_1$')
    ax.set_ylabel('$x_2$')
    ax.set_zlabel('$f(x_1,x_2)$')

    plt.grid()
    plt.show()

    if save_fig:
        # Save the figure
        fig_Model.savefig(f'./{newpath}/3DPlot_Model.pdf', format="pdf",
                          bbox_inches='tight')
        plt.close(fig_Model)

    return
class BayesInference (MetaModel, discrepancy=None, emulator=True, name='Calib', bootstrap=False, req_outputs=None, selected_indices=None, samples=None, n_samples=500000, measured_data=None, inference_method='rejection', mcmc_params=None, bayes_loocv=False, n_bootstrap_itrs=1, perturbed_data=[], bootstrap_noise=0.05, plot_post_pred=True, plot_map_pred=False, max_a_posteriori='mean', corner_title_fmt='.3e')

A class to perform Bayesian Analysis.

Attributes

MetaModel : obj
Meta model object.
discrepancy : obj
The discrepancy object for the sigma2s, i.e. the diagonal entries of the variance matrix for a multivariate normal likelihood.
name : str, optional
The type of analysis, either calibration (Calib) or validation (Valid). The default is 'Calib'.
emulator : bool, optional
Analysis with emulator (MetaModel). The default is True.
bootstrap : bool, optional
Bootstrap the analysis. The default is False.
req_outputs : list, optional
The list of requested output to be used for the analysis. The default is None. If None, all the defined outputs for the model object is used.
selected_indices : dict, optional
A dictionary with the selected indices of each model output. The default is None. If None, all measurement points are used in the analysis.
samples : array of shape (n_samples, n_params), optional
The samples to be used in the analysis. The default is None. If None the samples are drawn from the probablistic input parameter object of the MetaModel object.
n_samples : int, optional
Number of samples to be used in the analysis. The default is 500000. If samples is not None, this argument will be assigned based on the number of samples given.
measured_data : dict, optional
A dictionary containing the observation data. The default is None. if None, the observation defined in the Model object of the MetaModel is used.
inference_method : str, optional
A method for approximating the posterior distribution in the Bayesian inference step. The default is 'rejection', which stands for rejection sampling. A Markov Chain Monte Carlo sampler can be simply selected by passing 'MCMC'.
mcmc_params : dict, optional

A dictionary with args required for the Bayesian inference with MCMC. The default is None.

Pass the mcmc_params like the following:

>>> mcmc_params:{
    'init_samples': None,  # initial samples
    'n_walkers': 100,  # number of walkers (chain)
    'n_steps': 100000,  # number of maximum steps
    'n_burn': 200,  # number of burn-in steps
    'moves': None,  # Moves for the emcee sampler
    'multiprocessing': False,  # multiprocessing
    'verbose': False # verbosity
    }

The items shown above are the default values. If any parmeter is not defined, the default value will be assigned to it.

bayes_loocv : bool, optional
Bayesian Leave-one-out Cross Validation. The default is False. If True, the LOOCV procedure is used to estimate the bayesian Model Evidence (BME).
n_bootstrap_itrs : int, optional
Number of bootstrap iteration. The default is 1. If bayes_loocv is True, this is qualt to the total length of the observation data set.
perturbed_data : array of shape (n_bootstrap_itrs, n_obs), optional
User defined perturbed data. The default is [].
bootstrap_noise : float, optional
A noise level to perturb the data set. The default is 0.05.
plot_post_pred : bool, optional
Plot posterior predictive plots. The default is True.
plot_map_pred : bool, optional
Plot the model outputs vs the metamodel predictions for the maximum a posteriori (defined as max_a_posteriori) parameter set. The default is False.
max_a_posteriori : str, optional
Maximum a posteriori. 'mean' and 'mode' are available. The default is 'mean'.
corner_title_fmt : str, optional
Title format for the posterior distribution plot with python package corner. The default is '.3e'.
Expand source code
class BayesInference:
    """
    A class to perform Bayesian Analysis.


    Attributes
    ----------
    MetaModel : obj
        Meta model object.
    discrepancy : obj
        The discrepancy object for the sigma2s, i.e. the diagonal entries
        of the variance matrix for a multivariate normal likelihood.
    name : str, optional
        The type of analysis, either calibration (`Calib`) or validation
        (`Valid`). The default is `'Calib'`.
    emulator : bool, optional
        Analysis with emulator (MetaModel). The default is `True`.
    bootstrap : bool, optional
        Bootstrap the analysis. The default is `False`.
    req_outputs : list, optional
        The list of requested output to be used for the analysis.
        The default is `None`. If None, all the defined outputs for the model
        object is used.
    selected_indices : dict, optional
        A dictionary with the selected indices of each model output. The
        default is `None`. If `None`, all measurement points are used in the
        analysis.
    samples : array of shape (n_samples, n_params), optional
        The samples to be used in the analysis. The default is `None`. If
        None the samples are drawn from the probablistic input parameter
        object of the MetaModel object.
    n_samples : int, optional
        Number of samples to be used in the analysis. The default is `500000`.
        If samples is not `None`, this argument will be assigned based on the
        number of samples given.
    measured_data : dict, optional
        A dictionary containing the observation data. The default is `None`.
        if `None`, the observation defined in the Model object of the
        MetaModel is used.
    inference_method : str, optional
        A method for approximating the posterior distribution in the Bayesian
        inference step. The default is `'rejection'`, which stands for
        rejection sampling. A Markov Chain Monte Carlo sampler can be simply
        selected by passing `'MCMC'`.
    mcmc_params : dict, optional
        A dictionary with args required for the Bayesian inference with
        `MCMC`. The default is `None`.

        Pass the mcmc_params like the following:

            >>> mcmc_params:{
                'init_samples': None,  # initial samples
                'n_walkers': 100,  # number of walkers (chain)
                'n_steps': 100000,  # number of maximum steps
                'n_burn': 200,  # number of burn-in steps
                'moves': None,  # Moves for the emcee sampler
                'multiprocessing': False,  # multiprocessing
                'verbose': False # verbosity
                }
        The items shown above are the default values. If any parmeter is
        not defined, the default value will be assigned to it.
    bayes_loocv : bool, optional
        Bayesian Leave-one-out Cross Validation. The default is `False`. If
        `True`, the LOOCV procedure is used to estimate the bayesian Model
        Evidence (BME).
    n_bootstrap_itrs : int, optional
        Number of bootstrap iteration. The default is `1`. If bayes_loocv is
        `True`, this is qualt to the total length of the observation data
        set.
    perturbed_data : array of shape (n_bootstrap_itrs, n_obs), optional
        User defined perturbed data. The default is `[]`.
    bootstrap_noise : float, optional
        A noise level to perturb the data set. The default is `0.05`.
    plot_post_pred : bool, optional
        Plot posterior predictive plots. The default is `True`.
    plot_map_pred : bool, optional
        Plot the model outputs vs the metamodel predictions for the maximum
        a posteriori (defined as `max_a_posteriori`) parameter set. The
        default is `False`.
    max_a_posteriori : str, optional
        Maximum a posteriori. `'mean'` and `'mode'` are available. The default
        is `'mean'`.
    corner_title_fmt : str, optional
        Title format for the posterior distribution plot with python
        package `corner`. The default is `'.3e'`.

    """

    def __init__(self, MetaModel, discrepancy=None, emulator=True,
                 name='Calib', bootstrap=False, req_outputs=None,
                 selected_indices=None, samples=None, n_samples=500000,
                 measured_data=None, inference_method='rejection',
                 mcmc_params=None, bayes_loocv=False, n_bootstrap_itrs=1,
                 perturbed_data=[], bootstrap_noise=0.05, plot_post_pred=True,
                 plot_map_pred=False, max_a_posteriori='mean',
                 corner_title_fmt='.3e'):

        self.MetaModel = MetaModel
        self.Discrepancy = discrepancy
        self.emulator = emulator
        self.name = name
        self.bootstrap = bootstrap
        self.req_outputs = req_outputs
        self.selected_indices = selected_indices
        self.samples = samples
        self.n_samples = n_samples
        self.measured_data = measured_data
        self.inference_method = inference_method
        self.mcmc_params = mcmc_params
        self.perturbed_data = perturbed_data
        self.bayes_loocv = bayes_loocv
        self.n_bootstrap_itrs = n_bootstrap_itrs
        self.bootstrap_noise = bootstrap_noise
        self.plot_post_pred = plot_post_pred
        self.plot_map_pred = plot_map_pred
        self.max_a_posteriori = max_a_posteriori
        self.corner_title_fmt = corner_title_fmt

    # -------------------------------------------------------------------------
    def create_inference(self):
        """
        Starts the inference.

        Returns
        -------
        BayesInference : obj
            The Bayes inference object.

        """

        # Set some variables
        MetaModel = self.MetaModel
        Model = MetaModel.ModelObj
        n_params = MetaModel.n_params
        output_names = Model.Output.names
        par_names = MetaModel.ExpDesign.par_names

        # If the prior is set by the user, take it.
        if self.samples is None:
            self.samples = MetaModel.ExpDesign.generate_samples(
                self.n_samples, 'random')
        else:
            try:
                samples = self.samples.values
            except AttributeError:
                samples = self.samples

            # Take care of an additional Sigma2s
            self.samples = samples[:, :n_params]

            # Update number of samples
            self.n_samples = self.samples.shape[0]

        # ---------- Preparation of observation data ----------
        # Read observation data and perturb it if requested.
        if self.measured_data is None:
            self.measured_data = Model.read_observation(case=self.name)
        # Convert measured_data to a data frame
        if not isinstance(self.measured_data, pd.DataFrame):
            self.measured_data = pd.DataFrame(self.measured_data)

        # Extract the total number of measurement points
        if self.name.lower() == 'calib':
            self.n_tot_measurement = Model.n_obs
        else:
            self.n_tot_measurement = Model.n_obs_valid

        # Find measurement error (if not given) for post predictive plot
        if not hasattr(self, 'measurement_error'):
            if isinstance(self.Discrepancy, dict):
                Disc = self.Discrepancy['known']
            else:
                Disc = self.Discrepancy
            if isinstance(Disc.parameters, dict):
                self.measurement_error = {k: np.sqrt(Disc.parameters[k]) for k
                                          in Disc.parameters.keys()}
            else:
                try:
                    self.measurement_error = np.sqrt(Disc.parameters)
                except TypeError:
                    pass

        # ---------- Preparation of variance for covariance matrix ----------
        # Independent and identically distributed
        total_sigma2 = dict()
        opt_sigma_flag = isinstance(self.Discrepancy, dict)
        opt_sigma = None
        for key_idx, key in enumerate(output_names):

            # Find opt_sigma
            if opt_sigma_flag and opt_sigma is None:
                # Option A: known error with unknown bias term
                opt_sigma = 'A'
                known_discrepancy = self.Discrepancy['known']
                self.Discrepancy = self.Discrepancy['infer']
                sigma2 = np.array(known_discrepancy.parameters[key])

            elif opt_sigma == 'A' or self.Discrepancy.parameters is not None:
                # Option B: The sigma2 is known (no bias term)
                if opt_sigma == 'A':
                    sigma2 = np.array(known_discrepancy.parameters[key])
                else:
                    opt_sigma = 'B'
                    sigma2 = np.array(self.Discrepancy.parameters[key])

            elif not isinstance(self.Discrepancy.InputDisc, str):
                # Option C: The sigma2 is unknown (bias term including error)
                opt_sigma = 'C'
                self.Discrepancy.opt_sigma = opt_sigma
                n_measurement = self.measured_data[key].values.shape
                sigma2 = np.zeros((n_measurement[0]))

            total_sigma2[key] = sigma2

            self.Discrepancy.opt_sigma = opt_sigma
            self.Discrepancy.total_sigma2 = total_sigma2

        # If inferred sigma2s obtained from e.g. calibration are given
        try:
            self.sigma2s = self.Discrepancy.get_sample(self.n_samples)
        except:
            pass

        # ---------------- Bootstrap & TOM --------------------
        if self.bootstrap or self.bayes_loocv:
            if len(self.perturbed_data) == 0:
                # zero mean noise Adding some noise to the observation function
                self.perturbed_data = self._perturb_data(
                    self.measured_data, output_names
                    )
            else:
                self.n_bootstrap_itrs = len(self.perturbed_data)

            # -------- Model Discrepancy -----------
            if hasattr(self, 'error_model') and self.error_model \
               and self.name.lower() != 'calib':
                # Select posterior mean as MAP
                MAP_theta = self.samples.mean(axis=0).reshape((1, n_params))
                # MAP_theta = stats.mode(self.samples,axis=0)[0]

                # Evaluate the (meta-)model at the MAP
                y_MAP, y_std_MAP = MetaModel.eval_metamodel(samples=MAP_theta)

                # Train a GPR meta-model using MAP
                self.error_MetaModel = MetaModel.create_model_error(
                    self.bias_inputs, y_MAP, Name=self.name
                    )

            # -----------------------------------------------------
            # ----- Loop over the perturbed observation data ------
            # -----------------------------------------------------
            # Initilize arrays
            logLikelihoods = np.zeros((self.n_samples, self.n_bootstrap_itrs),
                                      dtype=np.float16)
            BME_Corr = np.zeros((self.n_bootstrap_itrs))
            log_BME = np.zeros((self.n_bootstrap_itrs))
            KLD = np.zeros((self.n_bootstrap_itrs))
            inf_entropy = np.zeros((self.n_bootstrap_itrs))

            # Compute the prior predtions
            # Evaluate the MetaModel
            if self.emulator:
                y_hat, y_std = MetaModel.eval_metamodel(samples=self.samples)
                self.__mean_pce_prior_pred = y_hat
                self._std_pce_prior_pred = y_std

                # Correct the predictions with Model discrepancy
                if hasattr(self, 'error_model') and self.error_model:
                    y_hat_corr, y_std = self.error_MetaModel.eval_model_error(
                        self.bias_inputs, self.__mean_pce_prior_pred
                        )
                    self.__mean_pce_prior_pred = y_hat_corr
                    self._std_pce_prior_pred = y_std

                # Surrogate model's error using RMSE of test data
                if hasattr(MetaModel, 'rmse'):
                    surrError = MetaModel.rmse
                else:
                    surrError = None

            else:
                # Evaluate the original model
                self.__model_prior_pred = self._eval_model(
                    samples=self.samples, key='PriorPred'
                    )

            # Start the likelihood-BME computations for the perturbed data
            for itr_idx, data in tqdm(
                    enumerate(self.perturbed_data), ascii=True,
                    desc="Boostraping the BME calculations"
                    ):

                # ---------------- Likelihood calculation ----------------
                if self.emulator:
                    model_evals = self.__mean_pce_prior_pred
                else:
                    model_evals = self.__model_prior_pred

                # Leave one out
                if self.bayes_loocv:
                    self.selected_indices = np.nonzero(data)[0]

                # Prepare data dataframe
                nobs = list(self.measured_data.count().values[1:])
                numbers = list(map(sum, zip([0] + nobs, nobs)))
                indices = list(zip([0] + numbers, numbers))
                data_dict = {
                    output_names[i]: data[j:k] for i, (j, k) in
                    enumerate(indices)
                    }

                # Unknown sigma2
                if opt_sigma == 'C' or hasattr(self, 'sigma2s'):
                    logLikelihoods[:, itr_idx] = self.normpdf(
                        model_evals, data_dict, total_sigma2,
                        sigma2=self.sigma2s, std=surrError
                        )
                else:
                    # known sigma2
                    logLikelihoods[:, itr_idx] = self.normpdf(
                        model_evals, data_dict, total_sigma2,
                        std=surrError
                        )

                # ---------------- BME Calculations ----------------
                # BME (log)
                log_BME[itr_idx] = np.log(
                    np.nanmean(np.exp(logLikelihoods[:, itr_idx],
                                      dtype=np.float128))
                    )

                # Rejection Step
                # Random numbers between 0 and 1
                unif = np.random.rand(1, self.n_samples)[0]

                # Reject the poorly performed prior
                Likelihoods = np.exp(logLikelihoods[:, itr_idx],
                                     dtype=np.float64)
                accepted = (Likelihoods/np.max(Likelihoods)) >= unif
                posterior = self.samples[accepted]

                # Posterior-based expectation of likelihoods
                postExpLikelihoods = np.mean(
                    logLikelihoods[:, itr_idx][accepted]
                    )

                # Posterior-based expectation of prior densities
                postExpPrior = np.mean(
                    np.log([MetaModel.ExpDesign.JDist.pdf(posterior.T)])
                    )

                # Calculate Kullback-Leibler Divergence
                KLD[itr_idx] = postExpLikelihoods - log_BME[itr_idx]

                # Information Entropy based on Entropy paper Eq. 38
                inf_entropy[itr_idx] = log_BME[itr_idx] - postExpPrior - \
                    postExpLikelihoods

                # TODO: BME correction when using Emulator
                # if self.emulator:
                #     BME_Corr[itr_idx] = self._corr_factor_BME(
                #         data, total_sigma2, posterior
                #         )

                # Clear memory
                gc.collect(generation=2)

            # ---------------- Store BME, Likelihoods for all ----------------
            # Likelihoods (Size: n_samples, n_bootstrap_itr)
            self.log_likes = logLikelihoods

            # BME (log), KLD, infEntropy (Size: 1,n_bootstrap_itr)
            self.log_BME = log_BME
            self.KLD = KLD
            self.inf_entropy = inf_entropy

            # TODO: BMECorrFactor (log) (Size: 1,n_bootstrap_itr)
            # if self.emulator: self.BMECorrFactor = BME_Corr

            # BME = BME + BMECorrFactor
            if self.emulator:
                self.log_BME = self.log_BME  # + self.BMECorrFactor

        # ---------------- Parameter Bayesian inference ----------------
        if self.inference_method.lower() == 'mcmc':
            # Instantiate the MCMC object
            MCMC_Obj = MCMC(self)
            self.posterior_df = MCMC_Obj.run_sampler(
                self.measured_data, total_sigma2
                )

        elif self.name.lower() == 'valid':
            # Convert to a dataframe if samples are provided after calibration.
            self.posterior_df = pd.DataFrame(self.samples, columns=par_names)

        else:
            # Rejection sampling
            self.posterior_df = self._rejection_sampling()

        # Provide posterior's summary
        print('\n')
        print('-'*15 + 'Posterior summary' + '-'*15)
        pd.options.display.max_columns = None
        pd.options.display.max_rows = None
        print(self.posterior_df.describe())
        print('-'*50)

        # -------- Model Discrepancy -----------
        if hasattr(self, 'error_model') and self.error_model \
           and self.name.lower() == 'calib':
            if self.inference_method.lower() == 'mcmc':
                self.error_MetaModel = MCMC_Obj.error_MetaModel
            else:
                # Select posterior mean as MAP
                if opt_sigma == "B":
                    posterior_df = self.posterior_df.values
                else:
                    posterior_df = self.posterior_df.values[:, :-Model.n_outputs]

                # Select posterior mean as Maximum a posteriori
                map_theta = posterior_df.mean(axis=0).reshape((1, n_params))
                # map_theta = stats.mode(Posterior_df,axis=0)[0]

                # Evaluate the (meta-)model at the MAP
                y_MAP, y_std_MAP = MetaModel.eval_metamodel(samples=map_theta)

                # Train a GPR meta-model using MAP
                self.error_MetaModel = MetaModel.create_model_error(
                    self.bias_inputs, y_MAP, Name=self.name
                    )

        # -------- Posterior perdictive -----------
        self._posterior_predictive()

        # -----------------------------------------------------
        # ------------------ Visualization --------------------
        # -----------------------------------------------------
        # Create Output directory, if it doesn't exist already.
        out_dir = f'Outputs_Bayes_{Model.name}_{self.name}'
        os.makedirs(out_dir, exist_ok=True)

        # -------- Posteior parameters --------
        if opt_sigma != "B":
            par_names.extend(
                [self.Discrepancy.InputDisc.Marginals[i].name for i
                 in range(len(self.Discrepancy.InputDisc.Marginals))]
                )
        # Pot with corner
        figPosterior = corner.corner(self.posterior_df.to_numpy(),
                                     labels=par_names,
                                     quantiles=[0.15, 0.5, 0.85],
                                     show_titles=True,
                                     title_fmt=self.corner_title_fmt,
                                     labelpad=0.2,
                                     use_math_text=True,
                                     title_kwargs={"fontsize": 28},
                                     plot_datapoints=False,
                                     plot_density=False,
                                     fill_contours=True,
                                     smooth=0.5,
                                     smooth1d=0.5)

        # Loop over axes and set x limits
        if opt_sigma == "B":
            axes = np.array(figPosterior.axes).reshape(
                (len(par_names), len(par_names))
                )
            for yi in range(len(par_names)):
                ax = axes[yi, yi]
                ax.set_xlim(MetaModel.bound_tuples[yi])
                for xi in range(yi):
                    ax = axes[yi, xi]
                    ax.set_xlim(MetaModel.bound_tuples[xi])

        # Turn off gridlines
        for ax in figPosterior.axes:
            ax.grid(False)

        if self.emulator:
            plotname = f'/Posterior_Dist_{Model.name}_emulator'
        else:
            plotname = f'/Posterior_Dist_{Model.name}'

        figPosterior.set_size_inches((24, 16))
        figPosterior.savefig(f'./{out_dir}{plotname}.pdf',
                             bbox_inches='tight')

        # -------- Plot MAP --------
        if self.plot_map_pred:
            self._plot_max_a_posteriori()

        # -------- Plot log_BME dist --------
        if self.bootstrap and self.n_bootstrap_itrs > 1:
            # Computing the TOM performance
            self.log_BME_tom = stats.chi2.rvs(
                self.n_tot_measurement, size=self.log_BME.shape[0]
                )

            fig, ax = plt.subplots()
            sns.kdeplot(self.log_BME_tom, ax=ax, color="green", shade=True)
            sns.kdeplot(
                self.log_BME, ax=ax, color="blue", shade=True,
                label='Model BME')

            ax.set_xlabel('log$_{10}$(BME)')
            ax.set_ylabel('Probability density')

            legend_elements = [
                Patch(facecolor='green', edgecolor='green', label='TOM BME'),
                Patch(facecolor='blue', edgecolor='blue', label='Model BME')
                ]
            ax.legend(handles=legend_elements)

            if self.emulator:
                plotname = f'/BME_hist_{Model.name}_emulator'
            else:
                plotname = f'/BME_hist_{Model.name}'

            plt.savefig(f'./{out_dir}{plotname}.pdf', bbox_inches='tight')
            plt.show()
            plt.close()

        # -------- Posteior perdictives --------
        if self.plot_post_pred:
            # Plot the posterior predictive
            self._plot_post_predictive()

        return self

    # -------------------------------------------------------------------------
    def _perturb_data(self, data, output_names):
        """
        Returns an array with n_bootstrap_itrs rowsof perturbed data.
        The first row includes the original observation data.
        If `self.bayes_loocv` is True, a 2d-array will be returned with
        repeated rows and zero diagonal entries.

        Parameters
        ----------
        data : pandas DataFrame
            Observation data.
        output_names : list
            List of the output names.

        Returns
        -------
        final_data : array
            Perturbed data set.

        """
        noise_level = self.bootstrap_noise
        obs_data = data[output_names].values
        n_measurement, n_outs = obs_data.shape
        self.n_tot_measurement = obs_data[~np.isnan(obs_data)].shape[0]
        # Number of bootstrap iterations
        if self.bayes_loocv:
            self.n_bootstrap_itrs = self.n_tot_measurement

        # Pass loocv dataset
        if self.bayes_loocv:
            obs = obs_data.T[~np.isnan(obs_data.T)]
            final_data = np.repeat(np.atleast_2d(obs), self.n_bootstrap_itrs,
                                   axis=0)
            np.fill_diagonal(final_data, 0)
            return final_data

        else:
            final_data = np.zeros(
                (self.n_bootstrap_itrs, self.n_tot_measurement)
                )
            final_data[0] = obs_data.T[~np.isnan(obs_data.T)]
            for itrIdx in range(1, self.n_bootstrap_itrs):
                data = np.zeros((n_measurement, n_outs))
                for idx in range(len(output_names)):
                    std = np.nanstd(obs_data[:, idx])
                    if std == 0:
                        std = 0.001
                    noise = std * noise_level
                    data[:, idx] = np.add(
                        obs_data[:, idx],
                        np.random.normal(0, 1, obs_data.shape[0]) * noise,
                    )

                final_data[itrIdx] = data.T[~np.isnan(data.T)]

            return final_data

    # -------------------------------------------------------------------------
    def _logpdf(self, x, mean, cov):
        """
        computes the likelihood based on a multivariate normal distribution.

        Parameters
        ----------
        x : TYPE
            DESCRIPTION.
        mean : array_like
            Observation data.
        cov : 2d array
            Covariance matrix of the distribution.

        Returns
        -------
        log_lik : float
            Log likelihood.

        """
        n = len(mean)
        L = spla.cholesky(cov, lower=True)
        beta = np.sum(np.log(np.diag(L)))
        dev = x - mean
        alpha = dev.dot(spla.cho_solve((L, True), dev))
        log_lik = -0.5 * alpha - beta - n / 2. * np.log(2 * np.pi)
        return log_lik

    # -------------------------------------------------------------------------
    def _eval_model(self, samples=None, key='MAP'):
        """
        Evaluates Forward Model.

        Parameters
        ----------
        samples : array of shape (n_samples, n_params), optional
            Parameter sets. The default is None.
        key : str, optional
            Key string to be passed to the run_model_parallel method.
            The default is 'MAP'.

        Returns
        -------
        model_outputs : TYPE
            DESCRIPTION.

        """
        MetaModel = self.MetaModel
        Model = MetaModel.ModelObj

        if samples is None:
            self.samples = MetaModel.ExpDesign.generate_samples(
                self.n_samples, 'random')
        else:
            self.samples = samples
            self.n_samples = len(samples)

        model_outputs, _ = Model.run_model_parallel(
            self.samples, key_str=key+self.name)

        # Clean up
        # Zip the subdirectories
        try:
            dir_name = f'{Model.name}MAP{self.name}'
            key = dir_name + '_'
            Model.zip_subdirs(dir_name, key)
        except:
            pass

        return model_outputs

    # -------------------------------------------------------------------------
    def _kernel_rbf(self, X, hyperparameters):
        """
        Isotropic squared exponential kernel.

        Higher l values lead to smoother functions and therefore to coarser
        approximations of the training data. Lower l values make functions
        more wiggly with wide uncertainty regions between training data points.

        sigma_f controls the marginal variance of b(x)

        Parameters
        ----------
        X : ndarray of shape (n_samples_X, n_features)

        hyperparameters : Dict
            Lambda characteristic length
            sigma_f controls the marginal variance of b(x)
            sigma_0 unresolvable error nugget term, interpreted as random
                    error that cannot be attributed to measurement error.
        Returns
        -------
        var_cov_matrix : ndarray of shape (n_samples_X,n_samples_X)
            Kernel k(X, X).

        """
        from sklearn.gaussian_process.kernels import RBF
        min_max_scaler = preprocessing.MinMaxScaler()
        X_minmax = min_max_scaler.fit_transform(X)

        nparams = len(hyperparameters)
        # characteristic length (0,1]
        Lambda = hyperparameters[0]
        # sigma_f controls the marginal variance of b(x)
        sigma2_f = hyperparameters[1]

        # cov_matrix = sigma2_f*rbf_kernel(X_minmax, gamma = 1/Lambda**2)

        rbf = RBF(length_scale=Lambda)
        cov_matrix = sigma2_f * rbf(X_minmax)
        if nparams > 2:
            # (unresolvable error) nugget term that is interpreted as random
            # error that cannot be attributed to measurement error.
            sigma2_0 = hyperparameters[2:]
            for i, j in np.ndindex(cov_matrix.shape):
                cov_matrix[i, j] += np.sum(sigma2_0) if i == j else 0

        return cov_matrix

    # -------------------------------------------------------------------------
    def normpdf(self, outputs, obs_data, total_sigma2s, sigma2=None, std=None):
        """
        Calculates the likelihood of simulation outputs compared with
        observation data.

        Parameters
        ----------
        outputs : dict
            A dictionary containing the simulation outputs as array of shape
            (n_samples, n_measurement) for each model output.
        obs_data : dict
            A dictionary/dataframe containing the observation data.
        total_sigma2s : dict
            A dictionary with known values of the covariance diagonal entries,
            a.k.a sigma^2.
        sigma2 : array, optional
            An array of the sigma^2 samples, when the covariance diagonal
            entries are unknown and are being jointly inferred. The default is
            None.
        std : dict, optional
            A dictionary containing the root mean squared error as array of
            shape (n_samples, n_measurement) for each model output. The default
            is None.

        Returns
        -------
        logLik : array of shape (n_samples)
            Likelihoods.

        """
        Model = self.MetaModel.ModelObj
        logLik = 0.0

        # Extract the requested model outputs for likelihood calulation
        if self.req_outputs is None:
            req_outputs = Model.Output.names
        else:
            req_outputs = list(self.req_outputs)

        # Loop over the outputs
        for idx, out in enumerate(req_outputs):

            # (Meta)Model Output
            nsamples, nout = outputs[out].shape

            # Prepare data and remove NaN
            try:
                data = obs_data[out].values[~np.isnan(obs_data[out])]
            except AttributeError:
                data = obs_data[out][~np.isnan(obs_data[out])]

            # Prepare sigma2s
            non_nan_indices = ~np.isnan(total_sigma2s[out])
            tot_sigma2s = total_sigma2s[out][non_nan_indices][:nout]

            # Add the std of the PCE is chosen as emulator.
            if self.emulator:
                if std is not None:
                    std_pce = std[out]
                else:
                    std_pce = np.mean(
                        self._std_pce_prior_pred[out], axis=0)
                # Expected value of variance (Assump: i.i.d stds)
                tot_sigma2s += std_pce**2

            # If sigma2 is not given, use given total_sigma2s
            if sigma2 is None:
                logLik += stats.multivariate_normal.logpdf(
                    outputs[out], data, np.diag(tot_sigma2s))
                continue

            # Loop over each run/sample and calculate logLikelihood
            logliks = np.zeros(nsamples)
            for s_idx in range(nsamples):

                # Simulation run
                tot_outputs = outputs[out]

                # Covariance Matrix
                covMatrix = np.diag(tot_sigma2s)

                if sigma2 is not None:
                    # Check the type error term
                    if hasattr(self, 'bias_inputs') and \
                       not hasattr(self, 'error_model'):
                        # Infer a Bias model usig Gaussian Process Regression
                        bias_inputs = np.hstack(
                            (self.bias_inputs[out],
                             tot_outputs[s_idx].reshape(-1, 1)))

                        params = sigma2[s_idx, idx*3:(idx+1)*3]
                        covMatrix = self._kernel_rbf(bias_inputs, params)
                    else:
                        # Infer equal sigma2s
                        try:
                            sigma_2 = sigma2[s_idx, idx]
                        except TypeError:
                            sigma_2 = 0.0

                        covMatrix += sigma_2 * np.eye(nout)
                        # covMatrix = np.diag(sigma2 * total_sigma2s)

                # Select the data points to compare
                if self.selected_indices is not None:
                    indices = self.selected_indices[out]
                    covMatrix = np.diag(covMatrix[indices, indices])
                else:
                    indices = list(range(nout))

                # Compute loglikelihood
                logliks[s_idx] = self._logpdf(
                    tot_outputs[s_idx, indices], data[indices], covMatrix
                    )

            logLik += logliks
        return logLik

    # -------------------------------------------------------------------------
    def _corr_factor_BME(self, Data, total_sigma2s, posterior):
        """
        Calculates the correction factor for BMEs.
        """
        MetaModel = self.MetaModel
        OrigModelOutput = MetaModel.ExpDesign.Y
        Model = MetaModel.ModelObj

        # Posterior with guassian-likelihood
        postDist = stats.gaussian_kde(posterior.T)

        # Remove NaN
        Data = Data[~np.isnan(Data)]
        total_sigma2s = total_sigma2s[~np.isnan(total_sigma2s)]

        # Covariance Matrix
        covMatrix = np.diag(total_sigma2s[:self.n_tot_measurement])

        # Extract the requested model outputs for likelihood calulation
        if self.req_outputs is None:
            OutputType = Model.Output.names
        else:
            OutputType = list(self.req_outputs)

        # SampleSize = OrigModelOutput[OutputType[0]].shape[0]


        # Flatten the OutputType for OrigModel
        TotalOutputs = np.concatenate([OrigModelOutput[x] for x in OutputType], 1)

        NrofBayesSamples = self.n_samples
        # Evaluate MetaModel on the experimental design
        Samples = MetaModel.ExpDesign.X
        OutputRS, stdOutputRS = MetaModel.eval_metamodel(samples=Samples)

        # Reset the NrofSamples to NrofBayesSamples
        self.n_samples = NrofBayesSamples

        # Flatten the OutputType for MetaModel
        TotalPCEOutputs = np.concatenate([OutputRS[x] for x in OutputRS], 1)
        TotalPCEstdOutputRS= np.concatenate([stdOutputRS[x] for x in stdOutputRS], 1)

        logweight = 0
        for i,sample in enumerate(Samples):
            # Compute likelilhood output vs RS
            covMatrix = np.diag(TotalPCEstdOutputRS[i]**2)
            logLik = self._logpdf(TotalOutputs[i], TotalPCEOutputs[i], covMatrix)
            # Compute posterior likelihood of the collocation points
            logpostLik = np.log(postDist.pdf(sample[:,None]))[0]
            if logpostLik != -np.inf:
                logweight += logLik + logpostLik
        return logweight
#         # Initialization
#         covMatrix=np.zeros((NofMeasurements, NofMeasurements), float)
#         BME_RM_Model_Weight = np.zeros((SampleSize))
#         BME_RM_Data_Weight = np.zeros((SampleSize))
#         BME_Corr = np.zeros((1))


#         # Deviation Computations
#         RM_Model_Deviation = np.zeros((SampleSize,NofMeasurements))
#         RM_Data_Deviation = np.zeros((SampleSize,NofMeasurements))
#         for i in range(SampleSize):
#             RM_Model_Deviation[i] = TotalOutputs[i][:NofMeasurements] - TotalPCEOutputs[i, :] # Reduce model- Full Model
#             RM_Data_Deviation[i] = Observations - TotalPCEOutputs[i, :] # Reduce model- Measurement Data


#         # Initialization  of Co-Variance Matrix
#         # For BME_RM_ModelWeight
#         if NofMeasurements == 1:
#             RM_Model_Error = np.zeros((NofMeasurements, NofMeasurements), float)
#             np.fill_diagonal(RM_Model_Error, np.cov(RM_Model_Deviation.T))
#         else:
#             RM_Model_Error = np.cov(RM_Model_Deviation.T)


#         # Computation of Weight according to the deviations
#         for i in range(SampleSize):
#             # For BME_RM_DataWeight
#             try:
#                 var = Sigma[i]
#                 if len(var)==1:
#                     np.fill_diagonal(covMatrix, var)
#                 else:
#                     row,col = np.diag_indices(covMatrix.shape[0])
#                     covMatrix[row,col] = np.hstack((np.repeat(var[0], NofMeasurements*0.5),np.repeat(var[1], NofMeasurements*0.5)))

#             except:
#                 var = Sigma

#             np.fill_diagonal(covMatrix,  var)

#             # Add the std of the PCE is emulator is chosen.
# #            if self.emulator:
# #                covMatrix_PCE = np.zeros((NofMeasurements, NofMeasurements), float)
# #                stdPCE = np.empty((SampleSize,0))
# #                for outputType in OutputType:
# #                    stdPCE = np.hstack((stdPCE, stdOutputRS[outputType]))
# #
# #                stdPCE = np.mean(stdPCE, axis=1)
# #                np.fill_diagonal(covMatrix_PCE, stdPCE**2)
# #
# #                covMatrix = covMatrix + covMatrix_PCE

#             # Calculate the denomitor
#             denom1 = (np.sqrt(2*np.pi)) ** NofMeasurements
#             denom2 = (((2*np.pi)**(NofMeasurements/2)) * np.sqrt(np.linalg.det(covMatrix)))

#             BME_RM_Model_Weight[i] =  (np.exp(-0.5 * np.dot(np.dot(RM_Model_Deviation[i], np.linalg.pinv(RM_Model_Error)), RM_Model_Deviation[i])))/denom1
#             BME_RM_Data_Weight[i] =  (np.exp(-0.5 * np.dot(np.dot(RM_Data_Deviation[i], np.linalg.pinv(covMatrix)), RM_Data_Deviation[i][:,np.newaxis])))/denom2

#         for i in range(SampleSize):
#             BME_Corr[0] += BME_RM_Model_Weight[i] * BME_RM_Data_Weight[i] / np.nansum(BME_RM_Data_Weight)

#         return np.log(BME_Corr[0])

    # -------------------------------------------------------------------------
    def _rejection_sampling(self):
        """
        Performs rejection sampling to update the prior distribution on the
        input parameters.

        Returns
        -------
        posterior : pandas.dataframe
            Posterior samples of the input parameters.

        """

        MetaModel = self.MetaModel
        try:
            sigma2_prior = self.Discrepancy.sigma2_prior
        except:
            sigma2_prior = None

        # Check if the discrepancy is defined as a distribution:
        samples = self.samples

        if sigma2_prior is not None:
            samples = np.hstack((samples, sigma2_prior))

        # Take the first column of Likelihoods (Observation data without noise)
        likelihoods = np.exp(self.log_likes[:, 0], dtype=np.float128)
        n_samples = len(likelihoods)
        norm_ikelihoods = likelihoods / np.max(likelihoods)

        # Normalize based on min if all Likelihoods are zero
        if all(likelihoods == 0.0):
            likelihoods = self.log_likes[:, 0]
            norm_ikelihoods = likelihoods / np.min(likelihoods)

        # Random numbers between 0 and 1
        unif = np.random.rand(1, n_samples)[0]

        # Reject the poorly performed prior
        accepted_samples = samples[norm_ikelihoods >= unif]

        # Output the Posterior
        par_names = MetaModel.ExpDesign.par_names
        if sigma2_prior is not None:
            for name in self.Discrepancy.name:
                par_names.append(name)

        return pd.DataFrame(accepted_samples, columns=sigma2_prior)

    # -------------------------------------------------------------------------
    def _posterior_predictive(self):
        """
        Stores the prior- and posterior predictive samples, i.e. model
        evaluations using the samples, into hdf5 files.

        priorPredictive.hdf5 : Prior predictive samples.
        postPredictive_wo_noise.hdf5 : Posterior predictive samples without
        the additive noise.
        postPredictive.hdf5 : Posterior predictive samples with the additive
        noise.

        Returns
        -------
        None.

        """

        MetaModel = self.MetaModel
        Model = MetaModel.ModelObj

        # Make a directory to save the prior/posterior predictive
        out_dir = f'Outputs_Bayes_{Model.name}_{self.name}'
        os.makedirs(out_dir, exist_ok=True)

        # Read observation data and perturb it if requested
        if self.measured_data is None:
            self.measured_data = Model.read_observation(case=self.name)

        if not isinstance(self.measured_data, pd.DataFrame):
            self.measured_data = pd.DataFrame(self.measured_data)

        # X_values
        x_values = MetaModel.ExpDesign.x_values

        try:
            sigma2_prior = self.Discrepancy.sigma2_prior
        except:
            sigma2_prior = None

        # Extract posterior samples
        posterior_df = self.posterior_df

        # Take care of the sigma2
        if sigma2_prior is not None:
            try:
                sigma2s = posterior_df[self.Discrepancy.name].values
                posterior_df = posterior_df.drop(
                    labels=self.Discrepancy.name, axis=1
                    )
            except:
                sigma2s = self.sigma2s

        # Posterior predictive
        if self.emulator:
            if self.inference_method == 'rejection':
                prior_pred = self.__mean_pce_prior_pred
            if self.name.lower() != 'calib':
                post_pred = self.__mean_pce_prior_pred
                post_pred_std = self._std_pce_prior_pred
            else:
                post_pred, post_pred_std = MetaModel.eval_metamodel(
                    samples=posterior_df.values
                    )

        else:
            if self.inference_method == 'rejection':
                prior_pred = self.__model_prior_pred
            if self.name.lower() != 'calib':
                post_pred = self.__mean_pce_prior_pred,
                post_pred_std = self._std_pce_prior_pred
            else:
                post_pred = self._eval_model(
                    samples=posterior_df.values, key='PostPred'
                    )
        # Correct the predictions with Model discrepancy
        if hasattr(self, 'error_model') and self.error_model:
            y_hat, y_std = self.error_MetaModel.eval_model_error(
                self.bias_inputs, post_pred
                )
            post_pred, post_pred_std = y_hat, y_std

        # Add discrepancy from likelihood Sample to the current posterior runs
        total_sigma2 = self.Discrepancy.total_sigma2
        post_pred_withnoise = copy.deepcopy(post_pred)
        for varIdx, var in enumerate(Model.Output.names):
            for i in range(len(post_pred[var])):
                pred = post_pred[var][i]

                # Known sigma2s
                clean_sigma2 = total_sigma2[var][~np.isnan(total_sigma2[var])]
                tot_sigma2 = clean_sigma2[:len(pred)]
                cov = np.diag(tot_sigma2)

                # Check the type error term
                if sigma2_prior is not None:
                    # Inferred sigma2s
                    if hasattr(self, 'bias_inputs') and \
                       not hasattr(self, 'error_model'):
                        # TODO: Infer a Bias model usig GPR
                        bias_inputs = np.hstack((
                            self.bias_inputs[var], pred.reshape(-1, 1)))
                        params = sigma2s[i, varIdx*3:(varIdx+1)*3]
                        cov = self._kernel_rbf(bias_inputs, params)
                    else:
                        # Infer equal sigma2s
                        try:
                            sigma2 = sigma2s[i, varIdx]
                        except TypeError:
                            sigma2 = 0.0

                        # Convert biasSigma2s to a covMatrix
                        cov += sigma2 * np.eye(len(pred))

                if self.emulator:
                    if hasattr(MetaModel, 'rmse') and \
                       MetaModel.rmse is not None:
                        stdPCE = MetaModel.rmse[var]
                    else:
                        stdPCE = post_pred_std[var][i]
                    # Expected value of variance (Assump: i.i.d stds)
                    cov += np.diag(stdPCE**2)

                # Sample a multivariate normal distribution with mean of
                # prediction and variance of cov
                post_pred_withnoise[var][i] = np.random.multivariate_normal(
                    pred, cov, 1
                    )

        # ----- Prior Predictive -----
        if self.inference_method.lower() == 'rejection':
            # Create hdf5 metadata
            hdf5file = f'{out_dir}/priorPredictive.hdf5'
            hdf5_exist = os.path.exists(hdf5file)
            if hdf5_exist:
                os.remove(hdf5file)
            file = h5py.File(hdf5file, 'a')

            # Store x_values
            if type(x_values) is dict:
                grp_x_values = file.create_group("x_values/")
                for varIdx, var in enumerate(Model.Output.names):
                    grp_x_values.create_dataset(var, data=x_values[var])
            else:
                file.create_dataset("x_values", data=x_values)

            # Store posterior predictive
            grpY = file.create_group("EDY/")
            for varIdx, var in enumerate(Model.Output.names):
                grpY.create_dataset(var, data=prior_pred[var])

        # ----- Posterior Predictive only model evaluations -----
        # Create hdf5 metadata
        hdf5file = out_dir+'/postPredictive_wo_noise.hdf5'
        hdf5_exist = os.path.exists(hdf5file)
        if hdf5_exist:
            os.remove(hdf5file)
        file = h5py.File(hdf5file, 'a')

        # Store x_values
        if type(x_values) is dict:
            grp_x_values = file.create_group("x_values/")
            for varIdx, var in enumerate(Model.Output.names):
                grp_x_values.create_dataset(var, data=x_values[var])
        else:
            file.create_dataset("x_values", data=x_values)

        # Store posterior predictive
        grpY = file.create_group("EDY/")
        for varIdx, var in enumerate(Model.Output.names):
            grpY.create_dataset(var, data=post_pred[var])

        # ----- Posterior Predictive with noise -----
        # Create hdf5 metadata
        hdf5file = out_dir+'/postPredictive.hdf5'
        hdf5_exist = os.path.exists(hdf5file)
        if hdf5_exist:
            os.remove(hdf5file)
        file = h5py.File(hdf5file, 'a')

        # Store x_values
        if type(x_values) is dict:
            grp_x_values = file.create_group("x_values/")
            for varIdx, var in enumerate(Model.Output.names):
                grp_x_values.create_dataset(var, data=x_values[var])
        else:
            file.create_dataset("x_values", data=x_values)

        # Store posterior predictive
        grpY = file.create_group("EDY/")
        for varIdx, var in enumerate(Model.Output.names):
            grpY.create_dataset(var, data=post_pred_withnoise[var])

        return

    # -------------------------------------------------------------------------
    def _plot_max_a_posteriori(self):
        """
        Plots the response of the model output against that of the metamodel at
        the maximum a posteriori sample (mean or mode of posterior.)

        Returns
        -------
        None.

        """

        MetaModel = self.MetaModel
        Model = MetaModel.ModelObj
        out_dir = f'Outputs_Bayes_{Model.name}_{self.name}'
        opt_sigma = self.Discrepancy.opt_sigma

        # -------- Find MAP and run MetaModel and origModel --------
        # Compute the MAP
        if self.max_a_posteriori.lower() == 'mean':
            if opt_sigma == "B":
                Posterior_df = self.posterior_df.values
            else:
                Posterior_df = self.posterior_df.values[:, :-Model.n_outputs]
            map_theta = Posterior_df.mean(axis=0).reshape(
                (1, MetaModel.n_params))
        else:
            map_theta = stats.mode(Posterior_df.values, axis=0)[0]
        # Prin report
        print("\nPoint estimator:\n", map_theta[0])

        # Run the models for MAP
        # MetaModel
        map_metamodel_mean, map_metamodel_std = MetaModel.eval_metamodel(
            samples=map_theta)
        self.map_metamodel_mean = map_metamodel_mean
        self.map_metamodel_std = map_metamodel_std

        # origModel
        map_orig_model = self._eval_model(samples=map_theta)
        self.map_orig_model = map_orig_model

        # Extract slicing index
        x_values = map_orig_model['x_values']

        # List of markers and colors
        Color = ['k', 'b', 'g', 'r']
        Marker = 'x'

        # Create a PdfPages object
        pdf = PdfPages(f'./{out_dir}MAP_PCE_vs_Model_{self.name}.pdf')
        fig = plt.figure()
        for i, key in enumerate(Model.Output.names):

            y_val = map_orig_model[key]
            y_pce_val = map_metamodel_mean[key]
            y_pce_val_std = map_metamodel_std[key]

            plt.plot(x_values, y_val, color=Color[i], marker=Marker,
                     lw=2.0, label='$Y_{MAP}^{M}$')

            plt.plot(
                x_values, y_pce_val[i], color=Color[i], lw=2.0,
                marker=Marker, linestyle='--', label='$Y_{MAP}^{PCE}$'
                )
            # plot the confidence interval
            plt.fill_between(
                x_values, y_pce_val[i] - 1.96*y_pce_val_std[i],
                y_pce_val[i] + 1.96*y_pce_val_std[i],
                color=Color[i], alpha=0.15
                )

            # Calculate the adjusted R_squared and RMSE
            R2 = r2_score(y_pce_val.reshape(-1, 1), y_val.reshape(-1, 1))
            rmse = np.sqrt(mean_squared_error(y_pce_val, y_val))

            plt.ylabel(key)
            plt.xlabel("Time [s]")
            plt.title(f'Model vs MetaModel {key}')

            ax = fig.axes[0]
            leg = ax.legend(loc='best', frameon=True)
            fig.canvas.draw()
            p = leg.get_window_extent().inverse_transformed(ax.transAxes)
            ax.text(
                p.p0[1]-0.05, p.p1[1]-0.25,
                f'RMSE = {rmse:.3f}\n$R^2$ = {R2:.3f}',
                transform=ax.transAxes, color='black',
                bbox=dict(facecolor='none', edgecolor='black',
                          boxstyle='round,pad=1'))

            plt.show()

            # save the current figure
            pdf.savefig(fig, bbox_inches='tight')

            # Destroy the current plot
            plt.clf()

        pdf.close()

    # -------------------------------------------------------------------------
    def _plot_post_predictive(self):
        """
        Plots the posterior predictives against the observation data.

        Returns
        -------
        None.

        """

        Model = self.MetaModel.ModelObj
        out_dir = f'Outputs_Bayes_{Model.name}_{self.name}'
        # Plot the posterior predictive
        for out_idx, out_name in enumerate(Model.Output.names):
            fig, ax = plt.subplots()
            with sns.axes_style("ticks"):
                x_key = list(self.measured_data)[0]

                # --- Read prior and posterior predictive ---
                if self.inference_method == 'rejection':
                    #  --- Prior ---
                    # Load posterior predictive
                    f = h5py.File(
                        f'{out_dir}/priorPredictive.hdf5', 'r+')

                    try:
                        x_coords = np.array(f[f"x_values/{out_name}"])
                    except:
                        x_coords = np.array(f["x_values"])

                    X_values = np.repeat(x_coords, 10000)

                    prior_pred_df = {}
                    prior_pred_df[x_key] = X_values
                    prior_pred_df[out_name] = np.array(
                        f[f"EDY/{out_name}"])[:10000].flatten('F')
                    prior_pred_df = pd.DataFrame(prior_pred_df)

                    tags_post = ['prior'] * len(prior_pred_df)
                    prior_pred_df.insert(
                        len(prior_pred_df.columns), "Tags", tags_post,
                        True)
                    f.close()

                    # --- Posterior ---
                    f = h5py.File(f"{out_dir}/postPredictive.hdf5", 'r+')

                    X_values = np.repeat(
                        x_coords, np.array(f[f"EDY/{out_name}"]).shape[0])

                    post_pred_df = {}
                    post_pred_df[x_key] = X_values
                    post_pred_df[out_name] = np.array(
                        f[f"EDY/{out_name}"]).flatten('F')

                    post_pred_df = pd.DataFrame(post_pred_df)

                    tags_post = ['posterior'] * len(post_pred_df)
                    post_pred_df.insert(
                        len(post_pred_df.columns), "Tags", tags_post, True)
                    f.close()
                    # Concatenate two dataframes based on x_values
                    frames = [prior_pred_df, post_pred_df]
                    all_pred_df = pd.concat(frames)

                    # --- Plot posterior predictive ---
                    sns.violinplot(
                        x_key, y=out_name, data=all_pred_df, hue="Tags",
                        legend=False, ax=ax, split=True, inner=None,
                        color=".8")

                    # --- Plot Data ---
                    # Find the x,y coordinates for each point
                    x_coords = np.arange(x_coords.shape[0])
                    first_header = list(self.measured_data)[0]
                    obs_data = self.measured_data.round({first_header: 6})
                    sns.pointplot(
                        x=first_header, y=out_name, color='g', markers='x',
                        linestyles='', capsize=16, data=obs_data, ax=ax)

                    ax.errorbar(
                        x_coords, obs_data[out_name].values,
                        yerr=1.96*self.measurement_error[out_name],
                        ecolor='g', fmt=' ', zorder=-1)

                    # Add labels to the legend
                    handles, labels = ax.get_legend_handles_labels()
                    labels.append('Data')

                    data_marker = mlines.Line2D(
                        [], [], color='lime', marker='+', linestyle='None',
                        markersize=10)
                    handles.append(data_marker)

                    # Add legend
                    ax.legend(handles=handles, labels=labels, loc='best',
                              fontsize='large', frameon=True)

                else:
                    # Load posterior predictive
                    f = h5py.File(f"{out_dir}/postPredictive.hdf5", 'r+')

                    try:
                        x_coords = np.array(f["x_values"])
                    except:
                        x_coords = np.array(f[f"x_values/{out_name}"])

                    mu = np.mean(np.array(f[f"EDY/{out_name}"]), axis=0)
                    std = np.std(np.array(f[f"EDY/{out_name}"]), axis=0)

                    # --- Plot posterior predictive ---
                    plt.plot(
                        x_coords, mu, marker='o', color='b',
                        label='Mean Post. Predictive')
                    plt.fill_between(
                        x_coords, mu-1.96*std, mu+1.96*std, color='b',
                        alpha=0.15)

                    # --- Plot Data ---
                    ax.plot(
                        x_coords, self.measured_data[out_name].values,
                        'ko', label='data', markeredgecolor='w')

                    # --- Plot ExpDesign ---
                    orig_ED_Y = self.MetaModel.ExpDesign.Y[out_name]
                    for output in orig_ED_Y:
                        plt.plot(
                            x_coords, output, color='grey', alpha=0.15
                            )

                    # Add labels for axes
                    plt.xlabel('Time [s]')
                    plt.ylabel(out_name)

                    # Add labels to the legend
                    handles, labels = ax.get_legend_handles_labels()

                    patch = Patch(color='b', alpha=0.15)
                    handles.insert(1, patch)
                    labels.insert(1, '95 $\\%$ CI')

                    # Add legend
                    ax.legend(handles=handles, labels=labels, loc='best',
                              frameon=True)

                # Save figure in pdf format
                if self.emulator:
                    plotname = f'/Post_Prior_Perd_{Model.name}_emulator'
                else:
                    plotname = f'/Post_Prior_Perd_{Model.name}'

                fig.savefig(f'./{out_dir}{plotname}_{out_name}.pdf',
                            bbox_inches='tight')

Methods

def create_inference(self)

Starts the inference.

Returns

BayesInference : obj
The Bayes inference object.
Expand source code
def create_inference(self):
    """
    Starts the inference.

    Returns
    -------
    BayesInference : obj
        The Bayes inference object.

    """

    # Set some variables
    MetaModel = self.MetaModel
    Model = MetaModel.ModelObj
    n_params = MetaModel.n_params
    output_names = Model.Output.names
    par_names = MetaModel.ExpDesign.par_names

    # If the prior is set by the user, take it.
    if self.samples is None:
        self.samples = MetaModel.ExpDesign.generate_samples(
            self.n_samples, 'random')
    else:
        try:
            samples = self.samples.values
        except AttributeError:
            samples = self.samples

        # Take care of an additional Sigma2s
        self.samples = samples[:, :n_params]

        # Update number of samples
        self.n_samples = self.samples.shape[0]

    # ---------- Preparation of observation data ----------
    # Read observation data and perturb it if requested.
    if self.measured_data is None:
        self.measured_data = Model.read_observation(case=self.name)
    # Convert measured_data to a data frame
    if not isinstance(self.measured_data, pd.DataFrame):
        self.measured_data = pd.DataFrame(self.measured_data)

    # Extract the total number of measurement points
    if self.name.lower() == 'calib':
        self.n_tot_measurement = Model.n_obs
    else:
        self.n_tot_measurement = Model.n_obs_valid

    # Find measurement error (if not given) for post predictive plot
    if not hasattr(self, 'measurement_error'):
        if isinstance(self.Discrepancy, dict):
            Disc = self.Discrepancy['known']
        else:
            Disc = self.Discrepancy
        if isinstance(Disc.parameters, dict):
            self.measurement_error = {k: np.sqrt(Disc.parameters[k]) for k
                                      in Disc.parameters.keys()}
        else:
            try:
                self.measurement_error = np.sqrt(Disc.parameters)
            except TypeError:
                pass

    # ---------- Preparation of variance for covariance matrix ----------
    # Independent and identically distributed
    total_sigma2 = dict()
    opt_sigma_flag = isinstance(self.Discrepancy, dict)
    opt_sigma = None
    for key_idx, key in enumerate(output_names):

        # Find opt_sigma
        if opt_sigma_flag and opt_sigma is None:
            # Option A: known error with unknown bias term
            opt_sigma = 'A'
            known_discrepancy = self.Discrepancy['known']
            self.Discrepancy = self.Discrepancy['infer']
            sigma2 = np.array(known_discrepancy.parameters[key])

        elif opt_sigma == 'A' or self.Discrepancy.parameters is not None:
            # Option B: The sigma2 is known (no bias term)
            if opt_sigma == 'A':
                sigma2 = np.array(known_discrepancy.parameters[key])
            else:
                opt_sigma = 'B'
                sigma2 = np.array(self.Discrepancy.parameters[key])

        elif not isinstance(self.Discrepancy.InputDisc, str):
            # Option C: The sigma2 is unknown (bias term including error)
            opt_sigma = 'C'
            self.Discrepancy.opt_sigma = opt_sigma
            n_measurement = self.measured_data[key].values.shape
            sigma2 = np.zeros((n_measurement[0]))

        total_sigma2[key] = sigma2

        self.Discrepancy.opt_sigma = opt_sigma
        self.Discrepancy.total_sigma2 = total_sigma2

    # If inferred sigma2s obtained from e.g. calibration are given
    try:
        self.sigma2s = self.Discrepancy.get_sample(self.n_samples)
    except:
        pass

    # ---------------- Bootstrap & TOM --------------------
    if self.bootstrap or self.bayes_loocv:
        if len(self.perturbed_data) == 0:
            # zero mean noise Adding some noise to the observation function
            self.perturbed_data = self._perturb_data(
                self.measured_data, output_names
                )
        else:
            self.n_bootstrap_itrs = len(self.perturbed_data)

        # -------- Model Discrepancy -----------
        if hasattr(self, 'error_model') and self.error_model \
           and self.name.lower() != 'calib':
            # Select posterior mean as MAP
            MAP_theta = self.samples.mean(axis=0).reshape((1, n_params))
            # MAP_theta = stats.mode(self.samples,axis=0)[0]

            # Evaluate the (meta-)model at the MAP
            y_MAP, y_std_MAP = MetaModel.eval_metamodel(samples=MAP_theta)

            # Train a GPR meta-model using MAP
            self.error_MetaModel = MetaModel.create_model_error(
                self.bias_inputs, y_MAP, Name=self.name
                )

        # -----------------------------------------------------
        # ----- Loop over the perturbed observation data ------
        # -----------------------------------------------------
        # Initilize arrays
        logLikelihoods = np.zeros((self.n_samples, self.n_bootstrap_itrs),
                                  dtype=np.float16)
        BME_Corr = np.zeros((self.n_bootstrap_itrs))
        log_BME = np.zeros((self.n_bootstrap_itrs))
        KLD = np.zeros((self.n_bootstrap_itrs))
        inf_entropy = np.zeros((self.n_bootstrap_itrs))

        # Compute the prior predtions
        # Evaluate the MetaModel
        if self.emulator:
            y_hat, y_std = MetaModel.eval_metamodel(samples=self.samples)
            self.__mean_pce_prior_pred = y_hat
            self._std_pce_prior_pred = y_std

            # Correct the predictions with Model discrepancy
            if hasattr(self, 'error_model') and self.error_model:
                y_hat_corr, y_std = self.error_MetaModel.eval_model_error(
                    self.bias_inputs, self.__mean_pce_prior_pred
                    )
                self.__mean_pce_prior_pred = y_hat_corr
                self._std_pce_prior_pred = y_std

            # Surrogate model's error using RMSE of test data
            if hasattr(MetaModel, 'rmse'):
                surrError = MetaModel.rmse
            else:
                surrError = None

        else:
            # Evaluate the original model
            self.__model_prior_pred = self._eval_model(
                samples=self.samples, key='PriorPred'
                )

        # Start the likelihood-BME computations for the perturbed data
        for itr_idx, data in tqdm(
                enumerate(self.perturbed_data), ascii=True,
                desc="Boostraping the BME calculations"
                ):

            # ---------------- Likelihood calculation ----------------
            if self.emulator:
                model_evals = self.__mean_pce_prior_pred
            else:
                model_evals = self.__model_prior_pred

            # Leave one out
            if self.bayes_loocv:
                self.selected_indices = np.nonzero(data)[0]

            # Prepare data dataframe
            nobs = list(self.measured_data.count().values[1:])
            numbers = list(map(sum, zip([0] + nobs, nobs)))
            indices = list(zip([0] + numbers, numbers))
            data_dict = {
                output_names[i]: data[j:k] for i, (j, k) in
                enumerate(indices)
                }

            # Unknown sigma2
            if opt_sigma == 'C' or hasattr(self, 'sigma2s'):
                logLikelihoods[:, itr_idx] = self.normpdf(
                    model_evals, data_dict, total_sigma2,
                    sigma2=self.sigma2s, std=surrError
                    )
            else:
                # known sigma2
                logLikelihoods[:, itr_idx] = self.normpdf(
                    model_evals, data_dict, total_sigma2,
                    std=surrError
                    )

            # ---------------- BME Calculations ----------------
            # BME (log)
            log_BME[itr_idx] = np.log(
                np.nanmean(np.exp(logLikelihoods[:, itr_idx],
                                  dtype=np.float128))
                )

            # Rejection Step
            # Random numbers between 0 and 1
            unif = np.random.rand(1, self.n_samples)[0]

            # Reject the poorly performed prior
            Likelihoods = np.exp(logLikelihoods[:, itr_idx],
                                 dtype=np.float64)
            accepted = (Likelihoods/np.max(Likelihoods)) >= unif
            posterior = self.samples[accepted]

            # Posterior-based expectation of likelihoods
            postExpLikelihoods = np.mean(
                logLikelihoods[:, itr_idx][accepted]
                )

            # Posterior-based expectation of prior densities
            postExpPrior = np.mean(
                np.log([MetaModel.ExpDesign.JDist.pdf(posterior.T)])
                )

            # Calculate Kullback-Leibler Divergence
            KLD[itr_idx] = postExpLikelihoods - log_BME[itr_idx]

            # Information Entropy based on Entropy paper Eq. 38
            inf_entropy[itr_idx] = log_BME[itr_idx] - postExpPrior - \
                postExpLikelihoods

            # TODO: BME correction when using Emulator
            # if self.emulator:
            #     BME_Corr[itr_idx] = self._corr_factor_BME(
            #         data, total_sigma2, posterior
            #         )

            # Clear memory
            gc.collect(generation=2)

        # ---------------- Store BME, Likelihoods for all ----------------
        # Likelihoods (Size: n_samples, n_bootstrap_itr)
        self.log_likes = logLikelihoods

        # BME (log), KLD, infEntropy (Size: 1,n_bootstrap_itr)
        self.log_BME = log_BME
        self.KLD = KLD
        self.inf_entropy = inf_entropy

        # TODO: BMECorrFactor (log) (Size: 1,n_bootstrap_itr)
        # if self.emulator: self.BMECorrFactor = BME_Corr

        # BME = BME + BMECorrFactor
        if self.emulator:
            self.log_BME = self.log_BME  # + self.BMECorrFactor

    # ---------------- Parameter Bayesian inference ----------------
    if self.inference_method.lower() == 'mcmc':
        # Instantiate the MCMC object
        MCMC_Obj = MCMC(self)
        self.posterior_df = MCMC_Obj.run_sampler(
            self.measured_data, total_sigma2
            )

    elif self.name.lower() == 'valid':
        # Convert to a dataframe if samples are provided after calibration.
        self.posterior_df = pd.DataFrame(self.samples, columns=par_names)

    else:
        # Rejection sampling
        self.posterior_df = self._rejection_sampling()

    # Provide posterior's summary
    print('\n')
    print('-'*15 + 'Posterior summary' + '-'*15)
    pd.options.display.max_columns = None
    pd.options.display.max_rows = None
    print(self.posterior_df.describe())
    print('-'*50)

    # -------- Model Discrepancy -----------
    if hasattr(self, 'error_model') and self.error_model \
       and self.name.lower() == 'calib':
        if self.inference_method.lower() == 'mcmc':
            self.error_MetaModel = MCMC_Obj.error_MetaModel
        else:
            # Select posterior mean as MAP
            if opt_sigma == "B":
                posterior_df = self.posterior_df.values
            else:
                posterior_df = self.posterior_df.values[:, :-Model.n_outputs]

            # Select posterior mean as Maximum a posteriori
            map_theta = posterior_df.mean(axis=0).reshape((1, n_params))
            # map_theta = stats.mode(Posterior_df,axis=0)[0]

            # Evaluate the (meta-)model at the MAP
            y_MAP, y_std_MAP = MetaModel.eval_metamodel(samples=map_theta)

            # Train a GPR meta-model using MAP
            self.error_MetaModel = MetaModel.create_model_error(
                self.bias_inputs, y_MAP, Name=self.name
                )

    # -------- Posterior perdictive -----------
    self._posterior_predictive()

    # -----------------------------------------------------
    # ------------------ Visualization --------------------
    # -----------------------------------------------------
    # Create Output directory, if it doesn't exist already.
    out_dir = f'Outputs_Bayes_{Model.name}_{self.name}'
    os.makedirs(out_dir, exist_ok=True)

    # -------- Posteior parameters --------
    if opt_sigma != "B":
        par_names.extend(
            [self.Discrepancy.InputDisc.Marginals[i].name for i
             in range(len(self.Discrepancy.InputDisc.Marginals))]
            )
    # Pot with corner
    figPosterior = corner.corner(self.posterior_df.to_numpy(),
                                 labels=par_names,
                                 quantiles=[0.15, 0.5, 0.85],
                                 show_titles=True,
                                 title_fmt=self.corner_title_fmt,
                                 labelpad=0.2,
                                 use_math_text=True,
                                 title_kwargs={"fontsize": 28},
                                 plot_datapoints=False,
                                 plot_density=False,
                                 fill_contours=True,
                                 smooth=0.5,
                                 smooth1d=0.5)

    # Loop over axes and set x limits
    if opt_sigma == "B":
        axes = np.array(figPosterior.axes).reshape(
            (len(par_names), len(par_names))
            )
        for yi in range(len(par_names)):
            ax = axes[yi, yi]
            ax.set_xlim(MetaModel.bound_tuples[yi])
            for xi in range(yi):
                ax = axes[yi, xi]
                ax.set_xlim(MetaModel.bound_tuples[xi])

    # Turn off gridlines
    for ax in figPosterior.axes:
        ax.grid(False)

    if self.emulator:
        plotname = f'/Posterior_Dist_{Model.name}_emulator'
    else:
        plotname = f'/Posterior_Dist_{Model.name}'

    figPosterior.set_size_inches((24, 16))
    figPosterior.savefig(f'./{out_dir}{plotname}.pdf',
                         bbox_inches='tight')

    # -------- Plot MAP --------
    if self.plot_map_pred:
        self._plot_max_a_posteriori()

    # -------- Plot log_BME dist --------
    if self.bootstrap and self.n_bootstrap_itrs > 1:
        # Computing the TOM performance
        self.log_BME_tom = stats.chi2.rvs(
            self.n_tot_measurement, size=self.log_BME.shape[0]
            )

        fig, ax = plt.subplots()
        sns.kdeplot(self.log_BME_tom, ax=ax, color="green", shade=True)
        sns.kdeplot(
            self.log_BME, ax=ax, color="blue", shade=True,
            label='Model BME')

        ax.set_xlabel('log$_{10}$(BME)')
        ax.set_ylabel('Probability density')

        legend_elements = [
            Patch(facecolor='green', edgecolor='green', label='TOM BME'),
            Patch(facecolor='blue', edgecolor='blue', label='Model BME')
            ]
        ax.legend(handles=legend_elements)

        if self.emulator:
            plotname = f'/BME_hist_{Model.name}_emulator'
        else:
            plotname = f'/BME_hist_{Model.name}'

        plt.savefig(f'./{out_dir}{plotname}.pdf', bbox_inches='tight')
        plt.show()
        plt.close()

    # -------- Posteior perdictives --------
    if self.plot_post_pred:
        # Plot the posterior predictive
        self._plot_post_predictive()

    return self
def normpdf(self, outputs, obs_data, total_sigma2s, sigma2=None, std=None)

Calculates the likelihood of simulation outputs compared with observation data.

Parameters

outputs : dict
A dictionary containing the simulation outputs as array of shape (n_samples, n_measurement) for each model output.
obs_data : dict
A dictionary/dataframe containing the observation data.
total_sigma2s : dict
A dictionary with known values of the covariance diagonal entries, a.k.a sigma^2.
sigma2 : array, optional
An array of the sigma^2 samples, when the covariance diagonal entries are unknown and are being jointly inferred. The default is None.
std : dict, optional
A dictionary containing the root mean squared error as array of shape (n_samples, n_measurement) for each model output. The default is None.

Returns

logLik : array of shape (n_samples)
Likelihoods.
Expand source code
def normpdf(self, outputs, obs_data, total_sigma2s, sigma2=None, std=None):
    """
    Calculates the likelihood of simulation outputs compared with
    observation data.

    Parameters
    ----------
    outputs : dict
        A dictionary containing the simulation outputs as array of shape
        (n_samples, n_measurement) for each model output.
    obs_data : dict
        A dictionary/dataframe containing the observation data.
    total_sigma2s : dict
        A dictionary with known values of the covariance diagonal entries,
        a.k.a sigma^2.
    sigma2 : array, optional
        An array of the sigma^2 samples, when the covariance diagonal
        entries are unknown and are being jointly inferred. The default is
        None.
    std : dict, optional
        A dictionary containing the root mean squared error as array of
        shape (n_samples, n_measurement) for each model output. The default
        is None.

    Returns
    -------
    logLik : array of shape (n_samples)
        Likelihoods.

    """
    Model = self.MetaModel.ModelObj
    logLik = 0.0

    # Extract the requested model outputs for likelihood calulation
    if self.req_outputs is None:
        req_outputs = Model.Output.names
    else:
        req_outputs = list(self.req_outputs)

    # Loop over the outputs
    for idx, out in enumerate(req_outputs):

        # (Meta)Model Output
        nsamples, nout = outputs[out].shape

        # Prepare data and remove NaN
        try:
            data = obs_data[out].values[~np.isnan(obs_data[out])]
        except AttributeError:
            data = obs_data[out][~np.isnan(obs_data[out])]

        # Prepare sigma2s
        non_nan_indices = ~np.isnan(total_sigma2s[out])
        tot_sigma2s = total_sigma2s[out][non_nan_indices][:nout]

        # Add the std of the PCE is chosen as emulator.
        if self.emulator:
            if std is not None:
                std_pce = std[out]
            else:
                std_pce = np.mean(
                    self._std_pce_prior_pred[out], axis=0)
            # Expected value of variance (Assump: i.i.d stds)
            tot_sigma2s += std_pce**2

        # If sigma2 is not given, use given total_sigma2s
        if sigma2 is None:
            logLik += stats.multivariate_normal.logpdf(
                outputs[out], data, np.diag(tot_sigma2s))
            continue

        # Loop over each run/sample and calculate logLikelihood
        logliks = np.zeros(nsamples)
        for s_idx in range(nsamples):

            # Simulation run
            tot_outputs = outputs[out]

            # Covariance Matrix
            covMatrix = np.diag(tot_sigma2s)

            if sigma2 is not None:
                # Check the type error term
                if hasattr(self, 'bias_inputs') and \
                   not hasattr(self, 'error_model'):
                    # Infer a Bias model usig Gaussian Process Regression
                    bias_inputs = np.hstack(
                        (self.bias_inputs[out],
                         tot_outputs[s_idx].reshape(-1, 1)))

                    params = sigma2[s_idx, idx*3:(idx+1)*3]
                    covMatrix = self._kernel_rbf(bias_inputs, params)
                else:
                    # Infer equal sigma2s
                    try:
                        sigma_2 = sigma2[s_idx, idx]
                    except TypeError:
                        sigma_2 = 0.0

                    covMatrix += sigma_2 * np.eye(nout)
                    # covMatrix = np.diag(sigma2 * total_sigma2s)

            # Select the data points to compare
            if self.selected_indices is not None:
                indices = self.selected_indices[out]
                covMatrix = np.diag(covMatrix[indices, indices])
            else:
                indices = list(range(nout))

            # Compute loglikelihood
            logliks[s_idx] = self._logpdf(
                tot_outputs[s_idx, indices], data[indices], covMatrix
                )

        logLik += logliks
    return logLik