Module bayesvalidrox.pylink

Expand source code
# -*- coding: utf-8 -*-

from .pylink import PyLinkForwardModel

__all__ = [
    "PyLinkForwardModel"
    ]

Sub-modules

bayesvalidrox.pylink.pylink

Classes

class PyLinkForwardModel (link_type='pylink', name=None, py_file=None, shell_command='', input_file=None, input_template=None, aux_file=None, exe_path='', output_file_names=[], output_names=[], output_parser='', multi_process=True, n_cpus=None, meas_file=None, meas_file_valid=None, mc_ref_file=None, obs_dict={}, obs_dict_valid={}, mc_ref_dict={})

A forward model binder

This calss serves as a code wrapper. This wrapper allows the execution of a third-party software/solver within the scope of BayesValidRox.

Attributes

link_type : str
The type of the wrapper. The default is 'pylink'. This runs the third-party software or an executable using a sell command with given input files. Second option is 'function' which assumed that model can be run using a function written separately in a Python script.
name : str
Name of the model.
py_file : str
Python file name without .py extension to be run for the 'function' wrapper. Note that the name of the python file and that of the function must be simillar. This function must recieve the parameters in an array of shape (n_samples, n_params) and returns a dictionary with the x_values and output arrays for given output names.
shell_command : str
Shell command to be executed for the 'pylink' wrapper.
input_file : str or list
The input file to be passed to the 'pylink' wrapper.
input_template : str or list
A template input file to be passed to the 'pylink' wrapper. This file must be a copy of input_file with <Xi> place holder for the input parameters defined using inputs class, with i being the number of parameter. The file name ending should include .tpl before the actual extension of the input file, for example, params.tpl.input.
aux_file : str or list
The list of auxiliary files needed for the 'pylink' wrapper.
exe_path : str
Execution path if you wish to run the model for the 'pylink' wrapper in another directory. The default is None, which corresponds to the currecnt working directory.
output_file_names : list of str
List of the name of the model output text files for the 'pylink' wrapper.
output_names : list of str
List of the model outputs to be used for the analysis.
output_parser : str
Name of the model parser file (without .py extension) that recieves the output_file_names and returns a 2d-array with the first row being the x_values, e.g. x coordinates or time and the rest of raws pass the simulation output for each model output defined in output_names. Note that again here the name of the file and that of the function must be the same.
multi_process : bool
Whether the model runs to be executed in parallel for the 'pylink' wrapper. The default is True.
n_cpus : int
The number of cpus to be used for the parallel model execution for the 'pylink' wrapper. The default is None, which corresponds to all available cpus.
meas_file : str
The name of the measurement text-based file. This file must contain x_values as the first column and one column for each model output. The default is None. Only needed for the Bayesian Inference.
meas_file_valid : str
The name of the measurement text-based file for the validation. The default is None. Only needed for the validation with Bayesian Inference.
mc_ref_file : str
The name of the text file for the Monte-Carlo reference (mean and standard deviation) values. It must contain x_values as the first column, mean as the second column and std as the third. It can be used to compare the estimated moments using meta-model in the post- processing step. This is only available for one output.
obs_dict : dict
A dictionary containing the measurement text-based file. It must contain x_values as the first item and one item for each model output . The default is {}. Only needed for the Bayesian Inference.
obs_dict_valid : dict
A dictionary containing the validation measurement text-based file. It must contain x_values as the first item and one item for each model output. The default is {}.
mc_ref_dict : dict
A dictionary containing the Monte-Carlo reference (mean and standard deviation) values. It must contain x_values as the first item and mean as the second item and std as the third. The default is {}. This is only available for one output.
Expand source code
class PyLinkForwardModel(object):
    """
    A forward model binder

    This calss serves as a code wrapper. This wrapper allows the execution of
    a third-party software/solver within the scope of BayesValidRox.

    Attributes
    ----------
    link_type : str
        The type of the wrapper. The default is `'pylink'`. This runs the
        third-party software or an executable using a sell command with given
        input files.
        Second option is `'function'` which assumed that model can be run using
        a function written separately in a Python script.
    name : str
        Name of the model.
    py_file : str
        Python file name without `.py` extension to be run for the `'function'`
        wrapper. Note that the name of the python file and that of the function
        must be simillar. This function must recieve the parameters in an array
        of shape `(n_samples, n_params)` and returns a dictionary with the
        x_values and output arrays for given output names.
    shell_command : str
        Shell command to be executed for the `'pylink'` wrapper.
    input_file : str or list
        The input file to be passed to the `'pylink'` wrapper.
    input_template : str or list
        A template input file to be passed to the `'pylink'` wrapper. This file
        must be a copy of `input_file` with `<Xi>` place holder for the input
        parameters defined using `inputs` class, with i being the number of
        parameter. The file name ending should include `.tpl` before the actual
        extension of the input file, for example, `params.tpl.input`.
    aux_file : str or list
        The list of auxiliary files needed for the `'pylink'` wrapper.
    exe_path : str
        Execution path if you wish to run the model for the `'pylink'` wrapper
        in another directory. The default is `None`, which corresponds to the
        currecnt working directory.
    output_file_names : list of str
        List of the name of the model output text files for the `'pylink'`
        wrapper.
    output_names : list of str
        List of the model outputs to be used for the analysis.
    output_parser : str
        Name of the model parser file (without `.py` extension) that recieves
        the `output_file_names` and returns a 2d-array with the first row being
        the x_values, e.g. x coordinates or time and the rest of raws pass the
        simulation output for each model output defined in `output_names`. Note
        that again here the name of the file and that of the function must be
        the same.
    multi_process: bool
        Whether the model runs to be executed in parallel for the `'pylink'`
        wrapper. The default is `True`.
    n_cpus: int
        The number of cpus to be used for the parallel model execution for the
        `'pylink'` wrapper. The default is `None`, which corresponds to all
        available cpus.
    meas_file : str
        The name of the measurement text-based file. This file must contain
        x_values as the first column and one column for each model output. The
        default is `None`. Only needed for the Bayesian Inference.
    meas_file_valid : str
        The name of the measurement text-based file for the validation. The
        default is `None`. Only needed for the validation with Bayesian
        Inference.
    mc_ref_file : str
        The name of the text file for the Monte-Carlo reference (mean and
        standard deviation) values. It must contain `x_values` as the first
        column, `mean` as the second column and `std` as the third. It can be
        used to compare the estimated moments using meta-model in the post-
        processing step. This is only available for one output.
    obs_dict : dict
        A dictionary containing the measurement text-based file. It must
        contain `x_values` as the first item and one item for each model output
        . The default is `{}`. Only needed for the Bayesian Inference.
    obs_dict_valid : dict
        A dictionary containing the validation measurement text-based file. It
        must contain `x_values` as the first item and one item for each model
        output. The default is `{}`.
    mc_ref_dict : dict
        A dictionary containing the Monte-Carlo reference (mean and standard
        deviation) values. It must contain `x_values` as the first item and
        `mean` as the second item and `std` as the third. The default is `{}`.
        This is only available for one output.
    """

    def __init__(self, link_type='pylink', name=None, py_file=None,
                 shell_command='', input_file=None, input_template=None,
                 aux_file=None, exe_path='', output_file_names=[],
                 output_names=[], output_parser='', multi_process=True,
                 n_cpus=None, meas_file=None, meas_file_valid=None,
                 mc_ref_file=None, obs_dict={}, obs_dict_valid={},
                 mc_ref_dict={}):
        self.link_type = link_type
        self.name = name
        self.shell_command = shell_command
        self.py_file = py_file
        self.input_file = input_file
        self.input_template = input_template
        self.aux_file = aux_file
        self.exe_path = exe_path
        self.multi_process = multi_process
        self.n_cpus = n_cpus
        self.Output.parser = output_parser
        self.Output.names = output_names
        self.Output.file_names = output_file_names
        self.meas_file = meas_file
        self.meas_file_valid = meas_file_valid
        self.mc_ref_file = mc_ref_file
        self.observations = obs_dict
        self.observations_valid = obs_dict_valid
        self.mc_reference = mc_ref_dict

    # Nested class
    class Output:
        def __init__(self):
            self.parser = None
            self.names = None
            self.file_names = None

    # -------------------------------------------------------------------------
    def within_range(self, out, minout, maxout):
        inside = False
        if (out > minout).all() and (out < maxout).all():
            inside = True
        return inside

    # -------------------------------------------------------------------------
    def read_observation(self, case='calib'):
        """
        Reads/prepare the observation/measurement data for
        calibration.

        Returns
        -------
        DataFrame
            A dataframe with the calibration data.

        """
        if case.lower() == 'calib':
            if isinstance(self.observations, dict) and bool(self.observations):
                obs = pd.DataFrame.from_dict(self.observations)
            elif self.meas_file is not None:
                file_path = os.path.join(os.getcwd(), self.meas_file)
                obs = pd.read_csv(file_path, delimiter=',')
            elif isinstance(self.observations, pd.DataFrame):
                obs = self.observations
            else:
                raise Exception("Please provide the observation data as a "
                                "dictionary via observations attribute or pass"
                                " the csv-file path to MeasurementFile "
                                "attribute")
        elif case.lower() == 'valid':
            if isinstance(self.observations_valid, dict) and \
              bool(self.observations_valid):
                obs = pd.DataFrame.from_dict(self.observations_valid)
            elif self.meas_file_valid is not None:
                file_path = os.path.join(os.getcwd(), self.meas_file_valid)
                obs = pd.read_csv(file_path, delimiter=',')
            elif isinstance(self.observations_valid, pd.DataFrame):
                obs = self.observations_valid
            else:
                raise Exception("Please provide the observation data as a "
                                "dictionary via Observations attribute or pass"
                                " the csv-file path to MeasurementFile "
                                "attribute")

        # Compute the number of observation
        n_obs = obs[self.Output.names].notnull().sum().values.sum()

        if case.lower() == 'calib':
            self.observations = obs
            self.n_obs = n_obs
            return self.observations
        elif case.lower() == 'valid':
            self.observations_valid = obs
            self.n_obs_valid = n_obs
            return self.observations_valid

    # -------------------------------------------------------------------------
    def read_mc_reference(self):
        """
        Is used, if a Monte-Carlo reference is available for
        further in-depth post-processing after meta-model training.

        Returns
        -------
        None

        """
        if self.mc_ref_file is None and not hasattr(self, 'mc_reference'):
            return
        elif isinstance(self.mc_reference, dict) and bool(self.mc_reference):
            self.mc_reference = pd.DataFrame.from_dict(self.mc_reference)
        elif self.mc_ref_file is not None:
            file_path = os.path.join(os.getcwd(), self.mc_ref_file)
            self.mc_reference = pd.read_csv(file_path, delimiter=',')
        else:
            raise Exception("Please provide the MC reference data as a "
                            "dictionary via mc_reference attribute or pass the"
                            " csv-file path to mc_ref_file attribute")
        return self.mc_reference

    # -------------------------------------------------------------------------
    def read_output(self):
        """
        Reads the the parser output file and returns it as an
         executable function. It is required when the models returns the
         simulation outputs in csv files.

        Returns
        -------
        Output : func
            Output parser function.

        """
        output_func_name = self.Output.parser

        output_func = getattr(__import__(output_func_name), output_func_name)

        file_names = []
        for File in self.Output.file_names:
            file_names.append(os.path.join(self.exe_path, File))
        try:
            output = output_func(self.name, file_names)
        except TypeError:
            output = output_func(file_names)
        return output

    # -------------------------------------------------------------------------
    def update_input_params(self, new_input_file, param_set):
        """
        Finds this pattern with <X1> in the new_input_file and replace it with
         the new value from the array param_sets.

        Parameters
        ----------
        new_input_file : list
            List of the input files with the adapted names.
        param_set : array of shape (n_params)
            Parameter set.

        Returns
        -------
        None.

        """
        NofPa = param_set.shape[0]
        text_to_search_list = [f'<X{i+1}>' for i in range(NofPa)]

        for filename in new_input_file:
            # Read in the file
            with open(filename, 'r') as file:
                filedata = file.read()

            # Replace the target string
            for text_to_search, params in zip(text_to_search_list, param_set):
                filedata = filedata.replace(text_to_search, f'{params:0.4e}')

            # Write the file out again
            with open(filename, 'w') as file:
                file.write(filedata)

    # -------------------------------------------------------------------------
    def run_command(self, command, output_file_names):
        """
        Runs the execution command given by the user to run the given model.
        It checks if the output files have been generated. If yes, the jobe is
        done and it extracts and returns the requested output(s). Otherwise,
        it executes the command again.

        Parameters
        ----------
        command : str
            The shell command to be executed.
        output_file_names : list
            Name of the output file names.

        Returns
        -------
        simulation_outputs : array of shape (n_obs, n_outputs)
            Simulation outputs.

        """

        # Check if simulation is finished
        while True:
            time.sleep(3)
            files = os.listdir(".")
            if all(elem in files for elem in output_file_names):
                break
            else:
                # Run command
                Process = os.system(f'./../{command}')
                if Process != 0:
                    print('\nMessage 1:')
                    print(f'\tIf value of \'{Process}\' is a non-zero value, '
                          'then compilation problems \n' % Process)

        os.chdir("..")

        # Read the output
        simulation_outputs = self.read_output()

        return simulation_outputs

    # -------------------------------------------------------------------------
    def run_forwardmodel(self, xx):
        """
        This function creates subdirectory for the current run and copies the
        necessary files to this directory and renames them. Next, it executes
        the given command.

        Parameters
        ----------
        xx : tuple
            A tuple including parameter set, simulation number and key string.

        Returns
        -------
        output : array of shape (n_outputs+1, n_obs)
            An array passed by the output paraser containing the x_values as
            the first row and the simulations results stored in the the rest of
            the array.

        """
        c_points, run_no, key_str = xx

        # Handle if only one imput file is provided
        if not isinstance(self.input_template, list):
            self.input_template = [self.input_template]
        if not isinstance(self.input_file, list):
            self.input_file = [self.input_file]

        new_input_file = []
        # Loop over the InputTemplates:
        for in_temp in self.input_template:
            if '/' in in_temp:
                in_temp = in_temp.split('/')[-1]
            new_input_file.append(in_temp.split('.tpl')[0] + key_str +
                                  f"_{run_no+1}" + in_temp.split('.tpl')[1])

        # Create directories
        newpath = self.name + key_str + f'_{run_no+1}'
        if not os.path.exists(newpath):
            os.makedirs(newpath)

        # Copy the necessary files to the directories
        for in_temp in self.input_template:
            # Input file(s) of the model
            shutil.copy2(in_temp, newpath)
        # Auxiliary file
        if self.aux_file is not None:
            shutil.copy2(self.aux_file, newpath)  # Auxiliary file

        # Rename the Inputfile and/or auxiliary file
        os.chdir(newpath)
        for input_tem, input_file in zip(self.input_template, new_input_file):
            if '/' in input_tem:
                input_tem = input_tem.split('/')[-1]
            os.rename(input_tem, input_file)

        # Update the parametrs in Input file
        self.update_input_params(new_input_file, c_points)

        # Update the user defined command and the execution path
        try:
            new_command = self.shell_command.replace(self.input_file[0],
                                                     new_input_file[0])
            new_command = new_command.replace(self.input_file[1],
                                              new_input_file[1])
        except:
            new_command = self.shell_command.replace(self.input_file[0],
                                                     new_input_file[0])
        # Set the exe path if not provided
        if not bool(self.exe_path):
            self.exe_path = os.getcwd()

        # Run the model
        output = self.run_command(new_command, self.Output.file_names)

        return output

    # -------------------------------------------------------------------------
    def run_model_parallel(self, c_points, prevRun_No=0, key_str='',
                           mp=True):
        """
        Runs model simulations. If mp is true (default), then the simulations
         are started in parallel.

        Parameters
        ----------
        c_points : array of shape (n_samples, n_params)
            Collocation points (training set).
        prevRun_No : int, optional
            Previous run number, in case the sequential design is selected.
            The default is `0`.
        key_str : str, optional
            A descriptive string for validation runs. The default is `''`.
        mp : bool, optional
            Multiprocessing. The default is `True`.

        Returns
        -------
        all_outputs : dict
            A dictionary with x values (time step or point id) and all outputs.
            Each key contains an array of the shape `(n_samples, n_obs)`.
        new_c_points : array
            Updated collocation points (training set). If a simulation does not
            executed successfully, the parameter set is removed.

        """

        # Create hdf5 metadata
        hdf5file = f'ExpDesign_{self.name}.hdf5'
        hdf5_exist = os.path.exists(hdf5file)
        file = h5py.File(hdf5file, 'a')

        # Initilization
        n_c_points = len(c_points)
        self.n_outputs = len(self.Output.names)
        all_outputs = {}

        # Extract the function
        if self.link_type.lower() == 'function':
            # Prepare the function
            Function = getattr(__import__(self.py_file), self.py_file)
        # ---------------------------------------------------------------
        # -------------- Multiprocessing with Pool Class ----------------
        # ---------------------------------------------------------------
        # Start a pool with the number of CPUs
        if self.n_cpus is None:
            n_cpus = multiprocessing.cpu_count()
        else:
            n_cpus = self.n_cpus

        # Run forward model either normal or with multiprocessing
        if not self.multi_process:
            group_results = list([self.run_forwardmodel((c_points,
                                                         prevRun_No,
                                                         key_str))])
        else:
            with multiprocessing.Pool(n_cpus) as p:
                desc = f'Running forward model {key_str}'
                if self.link_type.lower() == 'function':
                    imap_var = p.imap(Function, c_points[:, np.newaxis])
                else:
                    args = zip(c_points,
                               [prevRun_No+i for i in range(n_c_points)],
                               [key_str]*n_c_points)
                    imap_var = p.imap(self.run_forwardmodel, args)

                group_results = list(tqdm.tqdm(imap_var, total=n_c_points,
                                               desc=desc))

        # Save time steps or x-values
        x_values = group_results[0][0]
        all_outputs["x_values"] = x_values
        if not hdf5_exist:
            if type(x_values) is dict:
                grp_x_values = file.create_group("x_values/")
                for varIdx, var in enumerate(self.Output.names):
                    grp_x_values.create_dataset(var, data=x_values[var])
            else:
                file.create_dataset("x_values", data=x_values)

        # save each output in their corresponding array
        NaN_idx = []
        for varIdx, var in enumerate(self.Output.names):

            if not hdf5_exist:
                grpY = file.create_group("EDY/"+var)
            else:
                grpY = file.get("EDY/"+var)

            Outputs = np.asarray([item[varIdx+1] for item in group_results],
                                 dtype=np.float64)

            if prevRun_No == 0 and key_str == '':
                grpY.create_dataset(f'init_{key_str}', data=Outputs)
            else:
                try:
                    oldEDY = np.array(file[f'EDY/{var}/adaptive_{key_str}'])
                    del file[f'EDY/{var}/adaptive_{key_str}']
                    data = np.vstack((oldEDY, Outputs))
                except KeyError:
                    data = Outputs
                grpY.create_dataset('adaptive_'+key_str, data=data)

            NaN_idx = np.unique(np.argwhere(np.isnan(Outputs))[:, 0])
            all_outputs[var] = np.delete(Outputs, NaN_idx, axis=0)

            if prevRun_No == 0 and key_str == '':
                grpY.create_dataset(f"New_init_{key_str}",
                                    data=all_outputs[var])
            else:
                try:
                    name = f'EDY/{var}/New_adaptive_{key_str}'
                    oldEDY = np.array(file[name])
                    del file[f'EDY/{var}/New_adaptive_{key_str}']
                    data = np.vstack((oldEDY, all_outputs[var]))
                except KeyError:
                    data = all_outputs[var]
                grpY.create_dataset(f'New_adaptive_{key_str}', data=data)

        # Print the collocation points whose simulations crashed
        if len(NaN_idx) != 0:
            print('\n')
            print('*'*20)
            print("\nThe following parametersets have been removed:\n",
                  c_points[NaN_idx])
            print("\n")
            print('*'*20)

        # Pass it to the attribute
        new_c_points = np.delete(c_points, NaN_idx, axis=0)
        self.OutputMatrix = all_outputs

        # Save CollocationPoints
        grpX = file.create_group("EDX") if not hdf5_exist else file.get("EDX")
        if prevRun_No == 0 and key_str == '':
            grpX.create_dataset("init_"+key_str, data=c_points)
            if len(NaN_idx) != 0:
                grpX.create_dataset("New_init_"+key_str, data=new_c_points)

        else:
            try:
                name = f'EDX/adaptive_{key_str}'
                oldCollocationPoints = np.array(file[name])
                del file[f'EDX/adaptive_{key_str}']
                data = np.vstack((oldCollocationPoints, new_c_points))
            except KeyError:
                data = new_c_points
            grpX.create_dataset('adaptive_'+key_str, data=data)

            if len(NaN_idx) != 0:
                try:
                    name = f'EDX/New_adaptive_{key_str}'
                    oldCollocationPoints = np.array(file[name])
                    del file[f'EDX/New_adaptive_{key_str}']
                    data = np.vstack((oldCollocationPoints, new_c_points))
                except KeyError:
                    data = new_c_points
                grpX.create_dataset('New_adaptive_'+key_str, data=data)

        # Close h5py file
        file.close()

        return all_outputs, new_c_points

    # -------------------------------------------------------------------------
    def zip_subdirs(self, dir_name, key):
        """
        Zips all the files containing the key(word).

        Parameters
        ----------
        dir_name : str
            Directory name.
        key : str
            Keyword to search for.

        Returns
        -------
        None.

        """
        # setup file paths variable
        dir_list = []
        file_paths = []

        # Read all directory, subdirectories and file lists
        dir_path = os.getcwd()

        for root, directories, files in os.walk(dir_path):
            for directory in directories:
                # Create the full filepath by using os module.
                if key in directory:
                    folderPath = os.path.join(dir_path, directory)
                    dir_list.append(folderPath)

        # Loop over the identified directories to store the file paths
        for direct_name in dir_list:
            for root, directories, files in os.walk(direct_name):
                for filename in files:
                    # Create the full filepath by using os module.
                    filePath = os.path.join(root, filename)
                    file_paths.append('.'+filePath.split(dir_path)[1])

        # writing files to a zipfile
        if len(file_paths) != 0:
            zip_file = zipfile.ZipFile(dir_name+'.zip', 'w')
            with zip_file:
                # writing each file one by one
                for file in file_paths:
                    zip_file.write(file)

            file_paths = [path for path in os.listdir('.') if key in path]

            for path in file_paths:
                shutil.rmtree(path)

            print("\n")
            print(f'{dir_name}.zip file has been created successfully!\n')

        return

Class variables

var Output

Methods

def within_range(self, out, minout, maxout)
Expand source code
def within_range(self, out, minout, maxout):
    inside = False
    if (out > minout).all() and (out < maxout).all():
        inside = True
    return inside
def read_observation(self, case='calib')

Reads/prepare the observation/measurement data for calibration.

Returns

DataFrame
A dataframe with the calibration data.
Expand source code
def read_observation(self, case='calib'):
    """
    Reads/prepare the observation/measurement data for
    calibration.

    Returns
    -------
    DataFrame
        A dataframe with the calibration data.

    """
    if case.lower() == 'calib':
        if isinstance(self.observations, dict) and bool(self.observations):
            obs = pd.DataFrame.from_dict(self.observations)
        elif self.meas_file is not None:
            file_path = os.path.join(os.getcwd(), self.meas_file)
            obs = pd.read_csv(file_path, delimiter=',')
        elif isinstance(self.observations, pd.DataFrame):
            obs = self.observations
        else:
            raise Exception("Please provide the observation data as a "
                            "dictionary via observations attribute or pass"
                            " the csv-file path to MeasurementFile "
                            "attribute")
    elif case.lower() == 'valid':
        if isinstance(self.observations_valid, dict) and \
          bool(self.observations_valid):
            obs = pd.DataFrame.from_dict(self.observations_valid)
        elif self.meas_file_valid is not None:
            file_path = os.path.join(os.getcwd(), self.meas_file_valid)
            obs = pd.read_csv(file_path, delimiter=',')
        elif isinstance(self.observations_valid, pd.DataFrame):
            obs = self.observations_valid
        else:
            raise Exception("Please provide the observation data as a "
                            "dictionary via Observations attribute or pass"
                            " the csv-file path to MeasurementFile "
                            "attribute")

    # Compute the number of observation
    n_obs = obs[self.Output.names].notnull().sum().values.sum()

    if case.lower() == 'calib':
        self.observations = obs
        self.n_obs = n_obs
        return self.observations
    elif case.lower() == 'valid':
        self.observations_valid = obs
        self.n_obs_valid = n_obs
        return self.observations_valid
def read_mc_reference(self)

Is used, if a Monte-Carlo reference is available for further in-depth post-processing after meta-model training.

Returns

None
 
Expand source code
def read_mc_reference(self):
    """
    Is used, if a Monte-Carlo reference is available for
    further in-depth post-processing after meta-model training.

    Returns
    -------
    None

    """
    if self.mc_ref_file is None and not hasattr(self, 'mc_reference'):
        return
    elif isinstance(self.mc_reference, dict) and bool(self.mc_reference):
        self.mc_reference = pd.DataFrame.from_dict(self.mc_reference)
    elif self.mc_ref_file is not None:
        file_path = os.path.join(os.getcwd(), self.mc_ref_file)
        self.mc_reference = pd.read_csv(file_path, delimiter=',')
    else:
        raise Exception("Please provide the MC reference data as a "
                        "dictionary via mc_reference attribute or pass the"
                        " csv-file path to mc_ref_file attribute")
    return self.mc_reference
def read_output(self)

Reads the the parser output file and returns it as an executable function. It is required when the models returns the simulation outputs in csv files.

Returns

Output : func
Output parser function.
Expand source code
def read_output(self):
    """
    Reads the the parser output file and returns it as an
     executable function. It is required when the models returns the
     simulation outputs in csv files.

    Returns
    -------
    Output : func
        Output parser function.

    """
    output_func_name = self.Output.parser

    output_func = getattr(__import__(output_func_name), output_func_name)

    file_names = []
    for File in self.Output.file_names:
        file_names.append(os.path.join(self.exe_path, File))
    try:
        output = output_func(self.name, file_names)
    except TypeError:
        output = output_func(file_names)
    return output
def update_input_params(self, new_input_file, param_set)

Finds this pattern with in the new_input_file and replace it with the new value from the array param_sets.

Parameters

new_input_file : list
List of the input files with the adapted names.
param_set : array of shape (n_params)
Parameter set.

Returns

None.

Expand source code
def update_input_params(self, new_input_file, param_set):
    """
    Finds this pattern with <X1> in the new_input_file and replace it with
     the new value from the array param_sets.

    Parameters
    ----------
    new_input_file : list
        List of the input files with the adapted names.
    param_set : array of shape (n_params)
        Parameter set.

    Returns
    -------
    None.

    """
    NofPa = param_set.shape[0]
    text_to_search_list = [f'<X{i+1}>' for i in range(NofPa)]

    for filename in new_input_file:
        # Read in the file
        with open(filename, 'r') as file:
            filedata = file.read()

        # Replace the target string
        for text_to_search, params in zip(text_to_search_list, param_set):
            filedata = filedata.replace(text_to_search, f'{params:0.4e}')

        # Write the file out again
        with open(filename, 'w') as file:
            file.write(filedata)
def run_command(self, command, output_file_names)

Runs the execution command given by the user to run the given model. It checks if the output files have been generated. If yes, the jobe is done and it extracts and returns the requested output(s). Otherwise, it executes the command again.

Parameters

command : str
The shell command to be executed.
output_file_names : list
Name of the output file names.

Returns

simulation_outputs : array of shape (n_obs, n_outputs)
Simulation outputs.
Expand source code
def run_command(self, command, output_file_names):
    """
    Runs the execution command given by the user to run the given model.
    It checks if the output files have been generated. If yes, the jobe is
    done and it extracts and returns the requested output(s). Otherwise,
    it executes the command again.

    Parameters
    ----------
    command : str
        The shell command to be executed.
    output_file_names : list
        Name of the output file names.

    Returns
    -------
    simulation_outputs : array of shape (n_obs, n_outputs)
        Simulation outputs.

    """

    # Check if simulation is finished
    while True:
        time.sleep(3)
        files = os.listdir(".")
        if all(elem in files for elem in output_file_names):
            break
        else:
            # Run command
            Process = os.system(f'./../{command}')
            if Process != 0:
                print('\nMessage 1:')
                print(f'\tIf value of \'{Process}\' is a non-zero value, '
                      'then compilation problems \n' % Process)

    os.chdir("..")

    # Read the output
    simulation_outputs = self.read_output()

    return simulation_outputs
def run_forwardmodel(self, xx)

This function creates subdirectory for the current run and copies the necessary files to this directory and renames them. Next, it executes the given command.

Parameters

xx : tuple
A tuple including parameter set, simulation number and key string.

Returns

output : array of shape (n_outputs+1, n_obs)
An array passed by the output paraser containing the x_values as the first row and the simulations results stored in the the rest of the array.
Expand source code
def run_forwardmodel(self, xx):
    """
    This function creates subdirectory for the current run and copies the
    necessary files to this directory and renames them. Next, it executes
    the given command.

    Parameters
    ----------
    xx : tuple
        A tuple including parameter set, simulation number and key string.

    Returns
    -------
    output : array of shape (n_outputs+1, n_obs)
        An array passed by the output paraser containing the x_values as
        the first row and the simulations results stored in the the rest of
        the array.

    """
    c_points, run_no, key_str = xx

    # Handle if only one imput file is provided
    if not isinstance(self.input_template, list):
        self.input_template = [self.input_template]
    if not isinstance(self.input_file, list):
        self.input_file = [self.input_file]

    new_input_file = []
    # Loop over the InputTemplates:
    for in_temp in self.input_template:
        if '/' in in_temp:
            in_temp = in_temp.split('/')[-1]
        new_input_file.append(in_temp.split('.tpl')[0] + key_str +
                              f"_{run_no+1}" + in_temp.split('.tpl')[1])

    # Create directories
    newpath = self.name + key_str + f'_{run_no+1}'
    if not os.path.exists(newpath):
        os.makedirs(newpath)

    # Copy the necessary files to the directories
    for in_temp in self.input_template:
        # Input file(s) of the model
        shutil.copy2(in_temp, newpath)
    # Auxiliary file
    if self.aux_file is not None:
        shutil.copy2(self.aux_file, newpath)  # Auxiliary file

    # Rename the Inputfile and/or auxiliary file
    os.chdir(newpath)
    for input_tem, input_file in zip(self.input_template, new_input_file):
        if '/' in input_tem:
            input_tem = input_tem.split('/')[-1]
        os.rename(input_tem, input_file)

    # Update the parametrs in Input file
    self.update_input_params(new_input_file, c_points)

    # Update the user defined command and the execution path
    try:
        new_command = self.shell_command.replace(self.input_file[0],
                                                 new_input_file[0])
        new_command = new_command.replace(self.input_file[1],
                                          new_input_file[1])
    except:
        new_command = self.shell_command.replace(self.input_file[0],
                                                 new_input_file[0])
    # Set the exe path if not provided
    if not bool(self.exe_path):
        self.exe_path = os.getcwd()

    # Run the model
    output = self.run_command(new_command, self.Output.file_names)

    return output
def run_model_parallel(self, c_points, prevRun_No=0, key_str='', mp=True)

Runs model simulations. If mp is true (default), then the simulations are started in parallel.

Parameters

c_points : array of shape (n_samples, n_params)
Collocation points (training set).
prevRun_No : int, optional
Previous run number, in case the sequential design is selected. The default is 0.
key_str : str, optional
A descriptive string for validation runs. The default is ''.
mp : bool, optional
Multiprocessing. The default is True.

Returns

all_outputs : dict
A dictionary with x values (time step or point id) and all outputs. Each key contains an array of the shape (n_samples, n_obs).
new_c_points : array
Updated collocation points (training set). If a simulation does not executed successfully, the parameter set is removed.
Expand source code
def run_model_parallel(self, c_points, prevRun_No=0, key_str='',
                       mp=True):
    """
    Runs model simulations. If mp is true (default), then the simulations
     are started in parallel.

    Parameters
    ----------
    c_points : array of shape (n_samples, n_params)
        Collocation points (training set).
    prevRun_No : int, optional
        Previous run number, in case the sequential design is selected.
        The default is `0`.
    key_str : str, optional
        A descriptive string for validation runs. The default is `''`.
    mp : bool, optional
        Multiprocessing. The default is `True`.

    Returns
    -------
    all_outputs : dict
        A dictionary with x values (time step or point id) and all outputs.
        Each key contains an array of the shape `(n_samples, n_obs)`.
    new_c_points : array
        Updated collocation points (training set). If a simulation does not
        executed successfully, the parameter set is removed.

    """

    # Create hdf5 metadata
    hdf5file = f'ExpDesign_{self.name}.hdf5'
    hdf5_exist = os.path.exists(hdf5file)
    file = h5py.File(hdf5file, 'a')

    # Initilization
    n_c_points = len(c_points)
    self.n_outputs = len(self.Output.names)
    all_outputs = {}

    # Extract the function
    if self.link_type.lower() == 'function':
        # Prepare the function
        Function = getattr(__import__(self.py_file), self.py_file)
    # ---------------------------------------------------------------
    # -------------- Multiprocessing with Pool Class ----------------
    # ---------------------------------------------------------------
    # Start a pool with the number of CPUs
    if self.n_cpus is None:
        n_cpus = multiprocessing.cpu_count()
    else:
        n_cpus = self.n_cpus

    # Run forward model either normal or with multiprocessing
    if not self.multi_process:
        group_results = list([self.run_forwardmodel((c_points,
                                                     prevRun_No,
                                                     key_str))])
    else:
        with multiprocessing.Pool(n_cpus) as p:
            desc = f'Running forward model {key_str}'
            if self.link_type.lower() == 'function':
                imap_var = p.imap(Function, c_points[:, np.newaxis])
            else:
                args = zip(c_points,
                           [prevRun_No+i for i in range(n_c_points)],
                           [key_str]*n_c_points)
                imap_var = p.imap(self.run_forwardmodel, args)

            group_results = list(tqdm.tqdm(imap_var, total=n_c_points,
                                           desc=desc))

    # Save time steps or x-values
    x_values = group_results[0][0]
    all_outputs["x_values"] = x_values
    if not hdf5_exist:
        if type(x_values) is dict:
            grp_x_values = file.create_group("x_values/")
            for varIdx, var in enumerate(self.Output.names):
                grp_x_values.create_dataset(var, data=x_values[var])
        else:
            file.create_dataset("x_values", data=x_values)

    # save each output in their corresponding array
    NaN_idx = []
    for varIdx, var in enumerate(self.Output.names):

        if not hdf5_exist:
            grpY = file.create_group("EDY/"+var)
        else:
            grpY = file.get("EDY/"+var)

        Outputs = np.asarray([item[varIdx+1] for item in group_results],
                             dtype=np.float64)

        if prevRun_No == 0 and key_str == '':
            grpY.create_dataset(f'init_{key_str}', data=Outputs)
        else:
            try:
                oldEDY = np.array(file[f'EDY/{var}/adaptive_{key_str}'])
                del file[f'EDY/{var}/adaptive_{key_str}']
                data = np.vstack((oldEDY, Outputs))
            except KeyError:
                data = Outputs
            grpY.create_dataset('adaptive_'+key_str, data=data)

        NaN_idx = np.unique(np.argwhere(np.isnan(Outputs))[:, 0])
        all_outputs[var] = np.delete(Outputs, NaN_idx, axis=0)

        if prevRun_No == 0 and key_str == '':
            grpY.create_dataset(f"New_init_{key_str}",
                                data=all_outputs[var])
        else:
            try:
                name = f'EDY/{var}/New_adaptive_{key_str}'
                oldEDY = np.array(file[name])
                del file[f'EDY/{var}/New_adaptive_{key_str}']
                data = np.vstack((oldEDY, all_outputs[var]))
            except KeyError:
                data = all_outputs[var]
            grpY.create_dataset(f'New_adaptive_{key_str}', data=data)

    # Print the collocation points whose simulations crashed
    if len(NaN_idx) != 0:
        print('\n')
        print('*'*20)
        print("\nThe following parametersets have been removed:\n",
              c_points[NaN_idx])
        print("\n")
        print('*'*20)

    # Pass it to the attribute
    new_c_points = np.delete(c_points, NaN_idx, axis=0)
    self.OutputMatrix = all_outputs

    # Save CollocationPoints
    grpX = file.create_group("EDX") if not hdf5_exist else file.get("EDX")
    if prevRun_No == 0 and key_str == '':
        grpX.create_dataset("init_"+key_str, data=c_points)
        if len(NaN_idx) != 0:
            grpX.create_dataset("New_init_"+key_str, data=new_c_points)

    else:
        try:
            name = f'EDX/adaptive_{key_str}'
            oldCollocationPoints = np.array(file[name])
            del file[f'EDX/adaptive_{key_str}']
            data = np.vstack((oldCollocationPoints, new_c_points))
        except KeyError:
            data = new_c_points
        grpX.create_dataset('adaptive_'+key_str, data=data)

        if len(NaN_idx) != 0:
            try:
                name = f'EDX/New_adaptive_{key_str}'
                oldCollocationPoints = np.array(file[name])
                del file[f'EDX/New_adaptive_{key_str}']
                data = np.vstack((oldCollocationPoints, new_c_points))
            except KeyError:
                data = new_c_points
            grpX.create_dataset('New_adaptive_'+key_str, data=data)

    # Close h5py file
    file.close()

    return all_outputs, new_c_points
def zip_subdirs(self, dir_name, key)

Zips all the files containing the key(word).

Parameters

dir_name : str
Directory name.
key : str
Keyword to search for.

Returns

None.

Expand source code
def zip_subdirs(self, dir_name, key):
    """
    Zips all the files containing the key(word).

    Parameters
    ----------
    dir_name : str
        Directory name.
    key : str
        Keyword to search for.

    Returns
    -------
    None.

    """
    # setup file paths variable
    dir_list = []
    file_paths = []

    # Read all directory, subdirectories and file lists
    dir_path = os.getcwd()

    for root, directories, files in os.walk(dir_path):
        for directory in directories:
            # Create the full filepath by using os module.
            if key in directory:
                folderPath = os.path.join(dir_path, directory)
                dir_list.append(folderPath)

    # Loop over the identified directories to store the file paths
    for direct_name in dir_list:
        for root, directories, files in os.walk(direct_name):
            for filename in files:
                # Create the full filepath by using os module.
                filePath = os.path.join(root, filename)
                file_paths.append('.'+filePath.split(dir_path)[1])

    # writing files to a zipfile
    if len(file_paths) != 0:
        zip_file = zipfile.ZipFile(dir_name+'.zip', 'w')
        with zip_file:
            # writing each file one by one
            for file in file_paths:
                zip_file.write(file)

        file_paths = [path for path in os.listdir('.') if key in path]

        for path in file_paths:
            shutil.rmtree(path)

        print("\n")
        print(f'{dir_name}.zip file has been created successfully!\n')

    return