Module pylink

This program runs the model defined by the user.

Author: Farid Mohammadi, M.Sc. E-Mail: farid.mohammadi@iws.uni-stuttgart.de Department of Hydromechanics and Modelling of Hydrosystems (LH2) Institute for Modelling Hydraulic and Environmental Systems (IWS), University of Stuttgart, www.iws.uni-stuttgart.de/lh2/ Pfaffenwaldring 61 70569 Stuttgart

Created in July 2019

Expand source code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
This program runs the model defined by the user.

Author: Farid Mohammadi, M.Sc.
E-Mail: farid.mohammadi@iws.uni-stuttgart.de
Department of Hydromechanics and Modelling of Hydrosystems (LH2)
Institute for Modelling Hydraulic and Environmental Systems (IWS), University
of Stuttgart, www.iws.uni-stuttgart.de/lh2/
Pfaffenwaldring 61
70569 Stuttgart

Created in July 2019

"""

import os
import shutil
import h5py
import numpy as np
import time
import zipfile
import pandas as pd
from functools import reduce
import multiprocessing
import tqdm


class PyLinkForwardModel(object):
    """A forward model binder
    This calss serves as a code wrapper. This wrapper allows the execution of
        a third-party software/solver within the scope of BayesValidRox.
    The wrapper provides two options:
        1) link_type='PyLink':
            Runs the third-party software using a sell command with given input
            files.
        2) link_type='function':
            For this case, it is assumed that model can be run using a function
            written separately in a Python script. This function recieves the
            parameters in an array of shape (n_samples, n_params) and returns
            a dictionary with the x_values and output arrays for given output
            names.
    """

    def __init__(self, link_type='PyLink', name=None, shell_command='',
                 py_file=None, input_file=None, input_template=None,
                 aux_file=None, exe_path='', multi_process=True, n_cpus=None,
                 output_parser='', output_names=[], output_file_names=[],
                 meas_file=None, meas_file_valid=None, mc_ref_file=None,
                 obs_dict={}, obs_dict_valid={}, mc_ref_dict={}):
        self.link_type = link_type
        self.name = name
        self.shell_command = shell_command
        self.py_file = py_file
        self.input_file = input_file
        self.input_template = input_template
        self.aux_file = aux_file
        self.exe_path = exe_path
        self.multi_process = multi_process
        self.n_cpus = n_cpus
        self.Output.parser = output_parser
        self.Output.names = output_names
        self.Output.file_names = output_file_names
        self.meas_file = meas_file
        self.meas_file_valid = meas_file_valid
        self.mc_ref_file = mc_ref_file
        self.observations = obs_dict
        self.observations_valid = obs_dict_valid
        self.mc_reference = mc_ref_dict

    # Nested class
    class Output:
        def __init__(self):
            self.parser = None
            self.names = None
            self.file_names = None

    # -------------------------------------------------------------------------
    def within_range(self, out, minout, maxout):
        inside = False
        if (out > minout).all() and (out < maxout).all():
            inside = True
        return inside

    # -------------------------------------------------------------------------
    def read_observation(self, case='calib'):
        """
        Reads/prepare the observation/measurement data for
        calibration.

        Returns
        -------
        DataFrame
            A dataframe with the calibration data.

        """
        if case.lower() == 'calib':
            if bool(self.observations):
                obs = pd.DataFrame.from_dict(self.observations)
            elif self.meas_file is not None:
                file_path = os.path.join(os.getcwd(), self.meas_file)
                obs = pd.read_csv(file_path, delimiter=',')
            else:
                raise Exception("Please provide the observation data as a "
                                "dictionary via observations attribute or pass"
                                " the csv-file path to MeasurementFile "
                                "attribute")
        elif case.lower() == 'valid':
            if bool(self.observations_valid):
                obs = pd.DataFrame.from_dict(self.observations_valid)
            elif self.meas_file_valid is not None:
                file_path = os.path.join(os.getcwd(), self.meas_file_valid)
                obs = pd.read_csv(file_path, delimiter=',')
            else:
                raise Exception("Please provide the observation data as a "
                                "dictionary via Observations attribute or pass"
                                " the csv-file path to MeasurementFile "
                                "attribute")

        # Compute the number of observation
        n_obs = obs[self.Output.names].notnull().sum().values.sum()

        if case.lower() == 'calib':
            self.observations = obs
            self.n_obs = n_obs
            return self.observations
        elif case.lower() == 'valid':
            self.observations_valid = obs
            self.n_obs_valid = n_obs
            return self.observations_valid

    # -------------------------------------------------------------------------
    def read_mc_reference(self):
        """
        Is used, if a Monte-Carlo reference is available for
        further in-depth post-processing after meta-model training.

        Returns
        -------
        None

        """
        if self.mc_ref_file is None and not hasattr(self, 'mc_reference'):
            return
        elif isinstance(self.mc_reference, dict) and bool(self.mc_reference):
            self.mc_reference = pd.DataFrame.from_dict(self.mc_reference)
        elif self.mc_ref_file is not None:
            file_path = os.path.join(os.getcwd(), self.mc_ref_file)
            self.mc_reference = pd.read_csv(file_path, delimiter=',')
        else:
            raise Exception("Please provide the MC reference data as a "
                            "dictionary via mc_reference attribute or pass the"
                            " csv-file path to mc_ref_file attribute")
        return self.mc_reference

    # -------------------------------------------------------------------------
    def read_output(self):
        """
        Reads the the parser output file and returns it as an
         executable function. It is required when the models returns the
         simulation outputs in csv files.

        Returns
        -------
        Output : func
            Output parser function.

        """
        output_func_name = self.Output.parser

        output_func = getattr(__import__(output_func_name), output_func_name)

        file_names = []
        for File in self.Output.file_names:
            file_names.append(os.path.join(self.exe_path, File))
        try:
            output = output_func(self.name, file_names)
        except TypeError:
            output = output_func(file_names)
        return output

    # -------------------------------------------------------------------------
    def update_input_params(self, new_input_file, param_sets):
        """
        Finds this pattern with <X1> in the new_input_file and replace it with
         the new value from the array param_sets.

        Parameters
        ----------
        new_input_file : TYPE
            DESCRIPTION.
        param_sets : TYPE
            DESCRIPTION.

        Returns
        -------
        None.

        """
        NofPa = param_sets.shape[0]
        text_to_search_list = [f'<X{i+1}>' for i in range(NofPa)]

        for filename in new_input_file:
            # Read in the file
            with open(filename, 'r') as file:
                filedata = file.read()

            # Replace the target string
            for text_to_search, params in zip(text_to_search_list, param_sets):
                filedata = filedata.replace(text_to_search, f'{params:0.4e}')

            # Write the file out again
            with open(filename, 'w') as file:
                file.write(filedata)

    # -------------------------------------------------------------------------
    def run_command(self, command, output_file_names):
        """
        Runs the execution command given by the user to run the given model.
        It checks if the output files have been generated. If yes, the jobe is
         done and it extracts and returns the requested output(s). Otherwise,
         it executes the command again.

        Parameters
        ----------
        command : string
            The command to be executed.
        output_file_names : list
            Name of the output file names.

        Returns
        -------
        simulation_outputs : array of shape (n_obs, n_outputs)
            Simulation outputs.

        """

        # Check if simulation is finished
        while True:
            time.sleep(3)
            files = os.listdir(".")
            if all(elem in files for elem in output_file_names):
                break
            else:
                # Run command
                Process = os.system(f'./../{command}')
                if Process != 0:
                    print('\nMessage 1:')
                    print(f'\tIf value of \'{Process}\' is a non-zero value, '
                          'then compilation problems \n' % Process)

        os.chdir("..")

        # Read the output
        simulation_outputs = self.read_output()

        return simulation_outputs

    # -------------------------------------------------------------------------
    def run_forwardmodel(self, xx):
        """
        This function creates subdirectory for the current run and copies the
        necessary files to this directory and renames them. Next, it executes
        the given command.
        """
        c_points, run_no, key_str = xx

        # Handle if only one imput file is provided
        if not isinstance(self.input_template, list):
            self.input_template = [self.input_template]
        if not isinstance(self.input_file, list):
            self.input_file = [self.input_file]

        new_input_file = []
        # Loop over the InputTemplates:
        for in_temp in self.input_template:
            if '/' in in_temp:
                in_temp = in_temp.split('/')[-1]
            new_input_file.append(in_temp.split('.tpl')[0] + key_str +
                                  f"_{run_no+1}" + in_temp.split('.tpl')[1])

        # Create directories
        newpath = self.name + key_str + f'_{run_no+1}'
        if not os.path.exists(newpath):
            os.makedirs(newpath)

        # Copy the necessary files to the directories
        for in_temp in self.input_template:
            # Input file(s) of the model
            shutil.copy2(in_temp, newpath)
        # Auxiliary file
        if self.aux_file is not None:
            shutil.copy2(self.aux_file, newpath)  # Auxiliary file

        # Rename the Inputfile and/or auxiliary file
        os.chdir(newpath)
        for input_tem, input_file in zip(self.input_template, new_input_file):
            if '/' in input_tem:
                input_tem = input_tem.split('/')[-1]
            os.rename(input_tem, input_file)

        # Update the parametrs in Input file
        self.update_input_params(new_input_file, c_points)

        # Update the user defined command and the execution path
        try:
            new_command = self.shell_command.replace(self.input_file[0],
                                                     new_input_file[0])
            new_command = new_command.replace(self.input_file[1],
                                              new_input_file[1])
        except:
            new_command = self.shell_command.replace(self.input_file[0],
                                                     new_input_file[0])
        # Set the exe path if not provided
        if not bool(self.exe_path):
            self.exe_path = os.getcwd()

        # Run the model
        output = self.run_command(new_command, self.Output.file_names)

        return output

    # -------------------------------------------------------------------------
    def run_model_parallel(self, c_points, prevRun_No=0, key_str='',
                           mp=True):
        """
        Runs model simulations. If mp is true (default), then the simulations
         are started in parallel.

        Parameters
        ----------
        c_points : array like of shape (n_samples, n_params)
            Collocation points (training set).
        prevRun_No : int, optional
            Previous run number, in case the sequential design is selected.
            The default is 0.
        key_str : string, optional
            A descriptive string for validation runs. The default is ''.
        mp : bool, optional
            Multiprocessing. The default is True.

        Returns
        -------
        all_outputs : dict
            A dictionary with x values (time step or point id) and all outputs.
            Each key contains an array of the shape (n_samples, n_obs).
        new_c_points : array
            Updated collocation points (training set). If a simulation does not
            executed successfully, the parameter set is removed.

        """

        # Create hdf5 metadata
        hdf5file = f'ExpDesign_{self.name}.hdf5'
        hdf5_exist = os.path.exists(hdf5file)
        file = h5py.File(hdf5file, 'a')

        # Initilization
        n_c_points = len(c_points)
        self.n_outputs = len(self.Output.names)
        all_outputs = {}

        # Extract the function
        if self.link_type.lower() == 'function':
            # Prepare the function
            Function = getattr(__import__(self.py_file), self.py_file)
        # ---------------------------------------------------------------
        # -------------- Multiprocessing with Pool Class ----------------
        # ---------------------------------------------------------------
        # Start a pool with the number of CPUs
        if self.n_cpus is None:
            n_cpus = multiprocessing.cpu_count()
        else:
            n_cpus = self.n_cpus

        # Run forward model either normal or with multiprocessing
        if not self.multi_process:
            group_results = list([self.run_forwardmodel((c_points,
                                                         prevRun_No,
                                                         key_str))])
        else:
            with multiprocessing.Pool(n_cpus) as p:
                desc = f'Running forward model {key_str}'
                if self.link_type.lower() == 'function':
                    imap_var = p.imap(Function, c_points[:, np.newaxis])
                else:
                    args = zip(c_points,
                               [prevRun_No+i for i in range(n_c_points)],
                               [key_str]*n_c_points)
                    imap_var = p.imap(self.run_forwardmodel, args)

                group_results = list(tqdm.tqdm(imap_var, total=n_c_points,
                                               desc=desc))

        # Save time steps or x-values
        x_values = group_results[0][0]
        all_outputs["x_values"] = x_values
        if not hdf5_exist:
            if type(x_values) is dict:
                grp_x_values = file.create_group("x_values/")
                for varIdx, var in enumerate(self.Output.names):
                    grp_x_values.create_dataset(var, data=x_values[var])
            else:
                file.create_dataset("x_values", data=x_values)

        # save each output in their corresponding array
        NaN_idx = []
        for varIdx, var in enumerate(self.Output.names):

            if not hdf5_exist:
                grpY = file.create_group("EDY/"+var)
            else:
                grpY = file.get("EDY/"+var)

            Outputs = np.asarray([item[varIdx+1] for item in group_results],
                                 dtype=np.float64)

            if prevRun_No == 0 and key_str == '':
                grpY.create_dataset(f'init_{key_str}', data=Outputs)
            else:
                try:
                    oldEDY = np.array(file[f'EDY/{var}/adaptive_{key_str}'])
                    del file[f'EDY/{var}/adaptive_{key_str}']
                    data = np.vstack((oldEDY, Outputs))
                except KeyError:
                    data = Outputs
                grpY.create_dataset('adaptive_'+key_str, data=data)

            NaN_idx = np.unique(np.argwhere(np.isnan(Outputs))[:, 0])
            all_outputs[var] = np.delete(Outputs, NaN_idx, axis=0)

            if prevRun_No == 0 and key_str == '':
                grpY.create_dataset(f"New_init_{key_str}",
                                    data=all_outputs[var])
            else:
                try:
                    name = f'EDY/{var}/New_adaptive_{key_str}'
                    oldEDY = np.array(file[name])
                    del file[f'EDY/{var}/New_adaptive_{key_str}']
                    data = np.vstack((oldEDY, all_outputs[var]))
                except KeyError:
                    data = all_outputs[var]
                grpY.create_dataset(f'New_adaptive_{key_str}', data=data)

        # Print the collocation points whose simulations crashed
        if len(NaN_idx) != 0:
            print('\n')
            print('*'*20)
            print("\nThe following parametersets have been removed:\n",
                  c_points[NaN_idx])
            print("\n")
            print('*'*20)

        # Pass it to the attribute
        new_c_points = np.delete(c_points, NaN_idx, axis=0)
        self.OutputMatrix = all_outputs

        # Save CollocationPoints
        grpX = file.create_group("EDX") if not hdf5_exist else file.get("EDX")
        if prevRun_No == 0 and key_str == '':
            grpX.create_dataset("init_"+key_str, data=c_points)
            if len(NaN_idx) != 0:
                grpX.create_dataset("New_init_"+key_str, data=new_c_points)

        else:
            try:
                name = f'EDX/adaptive_{key_str}'
                oldCollocationPoints = np.array(file[name])
                del file[f'EDX/adaptive_{key_str}']
                data = np.vstack((oldCollocationPoints, new_c_points))
            except KeyError:
                data = new_c_points
            grpX.create_dataset('adaptive_'+key_str, data=data)

            if len(NaN_idx) != 0:
                try:
                    name = f'EDX/New_adaptive_{key_str}'
                    oldCollocationPoints = np.array(file[name])
                    del file[f'EDX/New_adaptive_{key_str}']
                    data = np.vstack((oldCollocationPoints, new_c_points))
                except KeyError:
                    data = new_c_points
                grpX.create_dataset('New_adaptive_'+key_str, data=data)

        # Close h5py file
        file.close()

        return all_outputs, new_c_points

    # -------------------------------------------------------------------------
    def zip_subdirs(self, dir_name, key):
        """
        Zips all the files containing the key(word).

        Parameters
        ----------
        dir_name : string
            Directory name.
        key : string
            Keyword to search for.

        Returns
        -------
        None.

        """
        # setup file paths variable
        dir_list = []
        file_paths = []

        # Read all directory, subdirectories and file lists
        dir_path = os.getcwd()

        for root, directories, files in os.walk(dir_path):
            for directory in directories:
                # Create the full filepath by using os module.
                if key in directory:
                    folderPath = os.path.join(dir_path, directory)
                    dir_list.append(folderPath)

        # Loop over the identified directories to store the file paths
        for direct_name in dir_list:
            for root, directories, files in os.walk(direct_name):
                for filename in files:
                    # Create the full filepath by using os module.
                    filePath = os.path.join(root, filename)
                    file_paths.append('.'+filePath.split(dir_path)[1])

        # writing files to a zipfile
        if len(file_paths) != 0:
            zip_file = zipfile.ZipFile(dir_name+'.zip', 'w')
            with zip_file:
                # writing each file one by one
                for file in file_paths:
                    zip_file.write(file)

            file_paths = [path for path in os.listdir('.') if key in path]

            for path in file_paths:
                shutil.rmtree(path)

            print("\n")
            print(f'{dir_name}.zip file has been created successfully!\n')

        return

Classes

class PyLinkForwardModel (link_type='PyLink', name=None, shell_command='', py_file=None, input_file=None, input_template=None, aux_file=None, exe_path='', multi_process=True, n_cpus=None, output_parser='', output_names=[], output_file_names=[], meas_file=None, meas_file_valid=None, mc_ref_file=None, obs_dict={}, obs_dict_valid={}, mc_ref_dict={})

A forward model binder This calss serves as a code wrapper. This wrapper allows the execution of a third-party software/solver within the scope of BayesValidRox. The wrapper provides two options: 1) link_type='PyLink': Runs the third-party software using a sell command with given input files. 2) link_type='function': For this case, it is assumed that model can be run using a function written separately in a Python script. This function recieves the parameters in an array of shape (n_samples, n_params) and returns a dictionary with the x_values and output arrays for given output names.

Expand source code
class PyLinkForwardModel(object):
    """A forward model binder
    This calss serves as a code wrapper. This wrapper allows the execution of
        a third-party software/solver within the scope of BayesValidRox.
    The wrapper provides two options:
        1) link_type='PyLink':
            Runs the third-party software using a sell command with given input
            files.
        2) link_type='function':
            For this case, it is assumed that model can be run using a function
            written separately in a Python script. This function recieves the
            parameters in an array of shape (n_samples, n_params) and returns
            a dictionary with the x_values and output arrays for given output
            names.
    """

    def __init__(self, link_type='PyLink', name=None, shell_command='',
                 py_file=None, input_file=None, input_template=None,
                 aux_file=None, exe_path='', multi_process=True, n_cpus=None,
                 output_parser='', output_names=[], output_file_names=[],
                 meas_file=None, meas_file_valid=None, mc_ref_file=None,
                 obs_dict={}, obs_dict_valid={}, mc_ref_dict={}):
        self.link_type = link_type
        self.name = name
        self.shell_command = shell_command
        self.py_file = py_file
        self.input_file = input_file
        self.input_template = input_template
        self.aux_file = aux_file
        self.exe_path = exe_path
        self.multi_process = multi_process
        self.n_cpus = n_cpus
        self.Output.parser = output_parser
        self.Output.names = output_names
        self.Output.file_names = output_file_names
        self.meas_file = meas_file
        self.meas_file_valid = meas_file_valid
        self.mc_ref_file = mc_ref_file
        self.observations = obs_dict
        self.observations_valid = obs_dict_valid
        self.mc_reference = mc_ref_dict

    # Nested class
    class Output:
        def __init__(self):
            self.parser = None
            self.names = None
            self.file_names = None

    # -------------------------------------------------------------------------
    def within_range(self, out, minout, maxout):
        inside = False
        if (out > minout).all() and (out < maxout).all():
            inside = True
        return inside

    # -------------------------------------------------------------------------
    def read_observation(self, case='calib'):
        """
        Reads/prepare the observation/measurement data for
        calibration.

        Returns
        -------
        DataFrame
            A dataframe with the calibration data.

        """
        if case.lower() == 'calib':
            if bool(self.observations):
                obs = pd.DataFrame.from_dict(self.observations)
            elif self.meas_file is not None:
                file_path = os.path.join(os.getcwd(), self.meas_file)
                obs = pd.read_csv(file_path, delimiter=',')
            else:
                raise Exception("Please provide the observation data as a "
                                "dictionary via observations attribute or pass"
                                " the csv-file path to MeasurementFile "
                                "attribute")
        elif case.lower() == 'valid':
            if bool(self.observations_valid):
                obs = pd.DataFrame.from_dict(self.observations_valid)
            elif self.meas_file_valid is not None:
                file_path = os.path.join(os.getcwd(), self.meas_file_valid)
                obs = pd.read_csv(file_path, delimiter=',')
            else:
                raise Exception("Please provide the observation data as a "
                                "dictionary via Observations attribute or pass"
                                " the csv-file path to MeasurementFile "
                                "attribute")

        # Compute the number of observation
        n_obs = obs[self.Output.names].notnull().sum().values.sum()

        if case.lower() == 'calib':
            self.observations = obs
            self.n_obs = n_obs
            return self.observations
        elif case.lower() == 'valid':
            self.observations_valid = obs
            self.n_obs_valid = n_obs
            return self.observations_valid

    # -------------------------------------------------------------------------
    def read_mc_reference(self):
        """
        Is used, if a Monte-Carlo reference is available for
        further in-depth post-processing after meta-model training.

        Returns
        -------
        None

        """
        if self.mc_ref_file is None and not hasattr(self, 'mc_reference'):
            return
        elif isinstance(self.mc_reference, dict) and bool(self.mc_reference):
            self.mc_reference = pd.DataFrame.from_dict(self.mc_reference)
        elif self.mc_ref_file is not None:
            file_path = os.path.join(os.getcwd(), self.mc_ref_file)
            self.mc_reference = pd.read_csv(file_path, delimiter=',')
        else:
            raise Exception("Please provide the MC reference data as a "
                            "dictionary via mc_reference attribute or pass the"
                            " csv-file path to mc_ref_file attribute")
        return self.mc_reference

    # -------------------------------------------------------------------------
    def read_output(self):
        """
        Reads the the parser output file and returns it as an
         executable function. It is required when the models returns the
         simulation outputs in csv files.

        Returns
        -------
        Output : func
            Output parser function.

        """
        output_func_name = self.Output.parser

        output_func = getattr(__import__(output_func_name), output_func_name)

        file_names = []
        for File in self.Output.file_names:
            file_names.append(os.path.join(self.exe_path, File))
        try:
            output = output_func(self.name, file_names)
        except TypeError:
            output = output_func(file_names)
        return output

    # -------------------------------------------------------------------------
    def update_input_params(self, new_input_file, param_sets):
        """
        Finds this pattern with <X1> in the new_input_file and replace it with
         the new value from the array param_sets.

        Parameters
        ----------
        new_input_file : TYPE
            DESCRIPTION.
        param_sets : TYPE
            DESCRIPTION.

        Returns
        -------
        None.

        """
        NofPa = param_sets.shape[0]
        text_to_search_list = [f'<X{i+1}>' for i in range(NofPa)]

        for filename in new_input_file:
            # Read in the file
            with open(filename, 'r') as file:
                filedata = file.read()

            # Replace the target string
            for text_to_search, params in zip(text_to_search_list, param_sets):
                filedata = filedata.replace(text_to_search, f'{params:0.4e}')

            # Write the file out again
            with open(filename, 'w') as file:
                file.write(filedata)

    # -------------------------------------------------------------------------
    def run_command(self, command, output_file_names):
        """
        Runs the execution command given by the user to run the given model.
        It checks if the output files have been generated. If yes, the jobe is
         done and it extracts and returns the requested output(s). Otherwise,
         it executes the command again.

        Parameters
        ----------
        command : string
            The command to be executed.
        output_file_names : list
            Name of the output file names.

        Returns
        -------
        simulation_outputs : array of shape (n_obs, n_outputs)
            Simulation outputs.

        """

        # Check if simulation is finished
        while True:
            time.sleep(3)
            files = os.listdir(".")
            if all(elem in files for elem in output_file_names):
                break
            else:
                # Run command
                Process = os.system(f'./../{command}')
                if Process != 0:
                    print('\nMessage 1:')
                    print(f'\tIf value of \'{Process}\' is a non-zero value, '
                          'then compilation problems \n' % Process)

        os.chdir("..")

        # Read the output
        simulation_outputs = self.read_output()

        return simulation_outputs

    # -------------------------------------------------------------------------
    def run_forwardmodel(self, xx):
        """
        This function creates subdirectory for the current run and copies the
        necessary files to this directory and renames them. Next, it executes
        the given command.
        """
        c_points, run_no, key_str = xx

        # Handle if only one imput file is provided
        if not isinstance(self.input_template, list):
            self.input_template = [self.input_template]
        if not isinstance(self.input_file, list):
            self.input_file = [self.input_file]

        new_input_file = []
        # Loop over the InputTemplates:
        for in_temp in self.input_template:
            if '/' in in_temp:
                in_temp = in_temp.split('/')[-1]
            new_input_file.append(in_temp.split('.tpl')[0] + key_str +
                                  f"_{run_no+1}" + in_temp.split('.tpl')[1])

        # Create directories
        newpath = self.name + key_str + f'_{run_no+1}'
        if not os.path.exists(newpath):
            os.makedirs(newpath)

        # Copy the necessary files to the directories
        for in_temp in self.input_template:
            # Input file(s) of the model
            shutil.copy2(in_temp, newpath)
        # Auxiliary file
        if self.aux_file is not None:
            shutil.copy2(self.aux_file, newpath)  # Auxiliary file

        # Rename the Inputfile and/or auxiliary file
        os.chdir(newpath)
        for input_tem, input_file in zip(self.input_template, new_input_file):
            if '/' in input_tem:
                input_tem = input_tem.split('/')[-1]
            os.rename(input_tem, input_file)

        # Update the parametrs in Input file
        self.update_input_params(new_input_file, c_points)

        # Update the user defined command and the execution path
        try:
            new_command = self.shell_command.replace(self.input_file[0],
                                                     new_input_file[0])
            new_command = new_command.replace(self.input_file[1],
                                              new_input_file[1])
        except:
            new_command = self.shell_command.replace(self.input_file[0],
                                                     new_input_file[0])
        # Set the exe path if not provided
        if not bool(self.exe_path):
            self.exe_path = os.getcwd()

        # Run the model
        output = self.run_command(new_command, self.Output.file_names)

        return output

    # -------------------------------------------------------------------------
    def run_model_parallel(self, c_points, prevRun_No=0, key_str='',
                           mp=True):
        """
        Runs model simulations. If mp is true (default), then the simulations
         are started in parallel.

        Parameters
        ----------
        c_points : array like of shape (n_samples, n_params)
            Collocation points (training set).
        prevRun_No : int, optional
            Previous run number, in case the sequential design is selected.
            The default is 0.
        key_str : string, optional
            A descriptive string for validation runs. The default is ''.
        mp : bool, optional
            Multiprocessing. The default is True.

        Returns
        -------
        all_outputs : dict
            A dictionary with x values (time step or point id) and all outputs.
            Each key contains an array of the shape (n_samples, n_obs).
        new_c_points : array
            Updated collocation points (training set). If a simulation does not
            executed successfully, the parameter set is removed.

        """

        # Create hdf5 metadata
        hdf5file = f'ExpDesign_{self.name}.hdf5'
        hdf5_exist = os.path.exists(hdf5file)
        file = h5py.File(hdf5file, 'a')

        # Initilization
        n_c_points = len(c_points)
        self.n_outputs = len(self.Output.names)
        all_outputs = {}

        # Extract the function
        if self.link_type.lower() == 'function':
            # Prepare the function
            Function = getattr(__import__(self.py_file), self.py_file)
        # ---------------------------------------------------------------
        # -------------- Multiprocessing with Pool Class ----------------
        # ---------------------------------------------------------------
        # Start a pool with the number of CPUs
        if self.n_cpus is None:
            n_cpus = multiprocessing.cpu_count()
        else:
            n_cpus = self.n_cpus

        # Run forward model either normal or with multiprocessing
        if not self.multi_process:
            group_results = list([self.run_forwardmodel((c_points,
                                                         prevRun_No,
                                                         key_str))])
        else:
            with multiprocessing.Pool(n_cpus) as p:
                desc = f'Running forward model {key_str}'
                if self.link_type.lower() == 'function':
                    imap_var = p.imap(Function, c_points[:, np.newaxis])
                else:
                    args = zip(c_points,
                               [prevRun_No+i for i in range(n_c_points)],
                               [key_str]*n_c_points)
                    imap_var = p.imap(self.run_forwardmodel, args)

                group_results = list(tqdm.tqdm(imap_var, total=n_c_points,
                                               desc=desc))

        # Save time steps or x-values
        x_values = group_results[0][0]
        all_outputs["x_values"] = x_values
        if not hdf5_exist:
            if type(x_values) is dict:
                grp_x_values = file.create_group("x_values/")
                for varIdx, var in enumerate(self.Output.names):
                    grp_x_values.create_dataset(var, data=x_values[var])
            else:
                file.create_dataset("x_values", data=x_values)

        # save each output in their corresponding array
        NaN_idx = []
        for varIdx, var in enumerate(self.Output.names):

            if not hdf5_exist:
                grpY = file.create_group("EDY/"+var)
            else:
                grpY = file.get("EDY/"+var)

            Outputs = np.asarray([item[varIdx+1] for item in group_results],
                                 dtype=np.float64)

            if prevRun_No == 0 and key_str == '':
                grpY.create_dataset(f'init_{key_str}', data=Outputs)
            else:
                try:
                    oldEDY = np.array(file[f'EDY/{var}/adaptive_{key_str}'])
                    del file[f'EDY/{var}/adaptive_{key_str}']
                    data = np.vstack((oldEDY, Outputs))
                except KeyError:
                    data = Outputs
                grpY.create_dataset('adaptive_'+key_str, data=data)

            NaN_idx = np.unique(np.argwhere(np.isnan(Outputs))[:, 0])
            all_outputs[var] = np.delete(Outputs, NaN_idx, axis=0)

            if prevRun_No == 0 and key_str == '':
                grpY.create_dataset(f"New_init_{key_str}",
                                    data=all_outputs[var])
            else:
                try:
                    name = f'EDY/{var}/New_adaptive_{key_str}'
                    oldEDY = np.array(file[name])
                    del file[f'EDY/{var}/New_adaptive_{key_str}']
                    data = np.vstack((oldEDY, all_outputs[var]))
                except KeyError:
                    data = all_outputs[var]
                grpY.create_dataset(f'New_adaptive_{key_str}', data=data)

        # Print the collocation points whose simulations crashed
        if len(NaN_idx) != 0:
            print('\n')
            print('*'*20)
            print("\nThe following parametersets have been removed:\n",
                  c_points[NaN_idx])
            print("\n")
            print('*'*20)

        # Pass it to the attribute
        new_c_points = np.delete(c_points, NaN_idx, axis=0)
        self.OutputMatrix = all_outputs

        # Save CollocationPoints
        grpX = file.create_group("EDX") if not hdf5_exist else file.get("EDX")
        if prevRun_No == 0 and key_str == '':
            grpX.create_dataset("init_"+key_str, data=c_points)
            if len(NaN_idx) != 0:
                grpX.create_dataset("New_init_"+key_str, data=new_c_points)

        else:
            try:
                name = f'EDX/adaptive_{key_str}'
                oldCollocationPoints = np.array(file[name])
                del file[f'EDX/adaptive_{key_str}']
                data = np.vstack((oldCollocationPoints, new_c_points))
            except KeyError:
                data = new_c_points
            grpX.create_dataset('adaptive_'+key_str, data=data)

            if len(NaN_idx) != 0:
                try:
                    name = f'EDX/New_adaptive_{key_str}'
                    oldCollocationPoints = np.array(file[name])
                    del file[f'EDX/New_adaptive_{key_str}']
                    data = np.vstack((oldCollocationPoints, new_c_points))
                except KeyError:
                    data = new_c_points
                grpX.create_dataset('New_adaptive_'+key_str, data=data)

        # Close h5py file
        file.close()

        return all_outputs, new_c_points

    # -------------------------------------------------------------------------
    def zip_subdirs(self, dir_name, key):
        """
        Zips all the files containing the key(word).

        Parameters
        ----------
        dir_name : string
            Directory name.
        key : string
            Keyword to search for.

        Returns
        -------
        None.

        """
        # setup file paths variable
        dir_list = []
        file_paths = []

        # Read all directory, subdirectories and file lists
        dir_path = os.getcwd()

        for root, directories, files in os.walk(dir_path):
            for directory in directories:
                # Create the full filepath by using os module.
                if key in directory:
                    folderPath = os.path.join(dir_path, directory)
                    dir_list.append(folderPath)

        # Loop over the identified directories to store the file paths
        for direct_name in dir_list:
            for root, directories, files in os.walk(direct_name):
                for filename in files:
                    # Create the full filepath by using os module.
                    filePath = os.path.join(root, filename)
                    file_paths.append('.'+filePath.split(dir_path)[1])

        # writing files to a zipfile
        if len(file_paths) != 0:
            zip_file = zipfile.ZipFile(dir_name+'.zip', 'w')
            with zip_file:
                # writing each file one by one
                for file in file_paths:
                    zip_file.write(file)

            file_paths = [path for path in os.listdir('.') if key in path]

            for path in file_paths:
                shutil.rmtree(path)

            print("\n")
            print(f'{dir_name}.zip file has been created successfully!\n')

        return

Class variables

var Output

Methods

def read_mc_reference(self)

Is used, if a Monte-Carlo reference is available for further in-depth post-processing after meta-model training.

Returns

None
 
Expand source code
def read_mc_reference(self):
    """
    Is used, if a Monte-Carlo reference is available for
    further in-depth post-processing after meta-model training.

    Returns
    -------
    None

    """
    if self.mc_ref_file is None and not hasattr(self, 'mc_reference'):
        return
    elif isinstance(self.mc_reference, dict) and bool(self.mc_reference):
        self.mc_reference = pd.DataFrame.from_dict(self.mc_reference)
    elif self.mc_ref_file is not None:
        file_path = os.path.join(os.getcwd(), self.mc_ref_file)
        self.mc_reference = pd.read_csv(file_path, delimiter=',')
    else:
        raise Exception("Please provide the MC reference data as a "
                        "dictionary via mc_reference attribute or pass the"
                        " csv-file path to mc_ref_file attribute")
    return self.mc_reference
def read_observation(self, case='calib')

Reads/prepare the observation/measurement data for calibration.

Returns

DataFrame
A dataframe with the calibration data.
Expand source code
def read_observation(self, case='calib'):
    """
    Reads/prepare the observation/measurement data for
    calibration.

    Returns
    -------
    DataFrame
        A dataframe with the calibration data.

    """
    if case.lower() == 'calib':
        if bool(self.observations):
            obs = pd.DataFrame.from_dict(self.observations)
        elif self.meas_file is not None:
            file_path = os.path.join(os.getcwd(), self.meas_file)
            obs = pd.read_csv(file_path, delimiter=',')
        else:
            raise Exception("Please provide the observation data as a "
                            "dictionary via observations attribute or pass"
                            " the csv-file path to MeasurementFile "
                            "attribute")
    elif case.lower() == 'valid':
        if bool(self.observations_valid):
            obs = pd.DataFrame.from_dict(self.observations_valid)
        elif self.meas_file_valid is not None:
            file_path = os.path.join(os.getcwd(), self.meas_file_valid)
            obs = pd.read_csv(file_path, delimiter=',')
        else:
            raise Exception("Please provide the observation data as a "
                            "dictionary via Observations attribute or pass"
                            " the csv-file path to MeasurementFile "
                            "attribute")

    # Compute the number of observation
    n_obs = obs[self.Output.names].notnull().sum().values.sum()

    if case.lower() == 'calib':
        self.observations = obs
        self.n_obs = n_obs
        return self.observations
    elif case.lower() == 'valid':
        self.observations_valid = obs
        self.n_obs_valid = n_obs
        return self.observations_valid
def read_output(self)

Reads the the parser output file and returns it as an executable function. It is required when the models returns the simulation outputs in csv files.

Returns

Output : func
Output parser function.
Expand source code
def read_output(self):
    """
    Reads the the parser output file and returns it as an
     executable function. It is required when the models returns the
     simulation outputs in csv files.

    Returns
    -------
    Output : func
        Output parser function.

    """
    output_func_name = self.Output.parser

    output_func = getattr(__import__(output_func_name), output_func_name)

    file_names = []
    for File in self.Output.file_names:
        file_names.append(os.path.join(self.exe_path, File))
    try:
        output = output_func(self.name, file_names)
    except TypeError:
        output = output_func(file_names)
    return output
def run_command(self, command, output_file_names)

Runs the execution command given by the user to run the given model. It checks if the output files have been generated. If yes, the jobe is done and it extracts and returns the requested output(s). Otherwise, it executes the command again.

Parameters

command : string
The command to be executed.
output_file_names : list
Name of the output file names.

Returns

simulation_outputs : array of shape (n_obs, n_outputs)
Simulation outputs.
Expand source code
def run_command(self, command, output_file_names):
    """
    Runs the execution command given by the user to run the given model.
    It checks if the output files have been generated. If yes, the jobe is
     done and it extracts and returns the requested output(s). Otherwise,
     it executes the command again.

    Parameters
    ----------
    command : string
        The command to be executed.
    output_file_names : list
        Name of the output file names.

    Returns
    -------
    simulation_outputs : array of shape (n_obs, n_outputs)
        Simulation outputs.

    """

    # Check if simulation is finished
    while True:
        time.sleep(3)
        files = os.listdir(".")
        if all(elem in files for elem in output_file_names):
            break
        else:
            # Run command
            Process = os.system(f'./../{command}')
            if Process != 0:
                print('\nMessage 1:')
                print(f'\tIf value of \'{Process}\' is a non-zero value, '
                      'then compilation problems \n' % Process)

    os.chdir("..")

    # Read the output
    simulation_outputs = self.read_output()

    return simulation_outputs
def run_forwardmodel(self, xx)

This function creates subdirectory for the current run and copies the necessary files to this directory and renames them. Next, it executes the given command.

Expand source code
def run_forwardmodel(self, xx):
    """
    This function creates subdirectory for the current run and copies the
    necessary files to this directory and renames them. Next, it executes
    the given command.
    """
    c_points, run_no, key_str = xx

    # Handle if only one imput file is provided
    if not isinstance(self.input_template, list):
        self.input_template = [self.input_template]
    if not isinstance(self.input_file, list):
        self.input_file = [self.input_file]

    new_input_file = []
    # Loop over the InputTemplates:
    for in_temp in self.input_template:
        if '/' in in_temp:
            in_temp = in_temp.split('/')[-1]
        new_input_file.append(in_temp.split('.tpl')[0] + key_str +
                              f"_{run_no+1}" + in_temp.split('.tpl')[1])

    # Create directories
    newpath = self.name + key_str + f'_{run_no+1}'
    if not os.path.exists(newpath):
        os.makedirs(newpath)

    # Copy the necessary files to the directories
    for in_temp in self.input_template:
        # Input file(s) of the model
        shutil.copy2(in_temp, newpath)
    # Auxiliary file
    if self.aux_file is not None:
        shutil.copy2(self.aux_file, newpath)  # Auxiliary file

    # Rename the Inputfile and/or auxiliary file
    os.chdir(newpath)
    for input_tem, input_file in zip(self.input_template, new_input_file):
        if '/' in input_tem:
            input_tem = input_tem.split('/')[-1]
        os.rename(input_tem, input_file)

    # Update the parametrs in Input file
    self.update_input_params(new_input_file, c_points)

    # Update the user defined command and the execution path
    try:
        new_command = self.shell_command.replace(self.input_file[0],
                                                 new_input_file[0])
        new_command = new_command.replace(self.input_file[1],
                                          new_input_file[1])
    except:
        new_command = self.shell_command.replace(self.input_file[0],
                                                 new_input_file[0])
    # Set the exe path if not provided
    if not bool(self.exe_path):
        self.exe_path = os.getcwd()

    # Run the model
    output = self.run_command(new_command, self.Output.file_names)

    return output
def run_model_parallel(self, c_points, prevRun_No=0, key_str='', mp=True)

Runs model simulations. If mp is true (default), then the simulations are started in parallel.

Parameters

c_points : array like of shape (n_samples, n_params)
Collocation points (training set).
prevRun_No : int, optional
Previous run number, in case the sequential design is selected. The default is 0.
key_str : string, optional
A descriptive string for validation runs. The default is ''.
mp : bool, optional
Multiprocessing. The default is True.

Returns

all_outputs : dict
A dictionary with x values (time step or point id) and all outputs. Each key contains an array of the shape (n_samples, n_obs).
new_c_points : array
Updated collocation points (training set). If a simulation does not executed successfully, the parameter set is removed.
Expand source code
def run_model_parallel(self, c_points, prevRun_No=0, key_str='',
                       mp=True):
    """
    Runs model simulations. If mp is true (default), then the simulations
     are started in parallel.

    Parameters
    ----------
    c_points : array like of shape (n_samples, n_params)
        Collocation points (training set).
    prevRun_No : int, optional
        Previous run number, in case the sequential design is selected.
        The default is 0.
    key_str : string, optional
        A descriptive string for validation runs. The default is ''.
    mp : bool, optional
        Multiprocessing. The default is True.

    Returns
    -------
    all_outputs : dict
        A dictionary with x values (time step or point id) and all outputs.
        Each key contains an array of the shape (n_samples, n_obs).
    new_c_points : array
        Updated collocation points (training set). If a simulation does not
        executed successfully, the parameter set is removed.

    """

    # Create hdf5 metadata
    hdf5file = f'ExpDesign_{self.name}.hdf5'
    hdf5_exist = os.path.exists(hdf5file)
    file = h5py.File(hdf5file, 'a')

    # Initilization
    n_c_points = len(c_points)
    self.n_outputs = len(self.Output.names)
    all_outputs = {}

    # Extract the function
    if self.link_type.lower() == 'function':
        # Prepare the function
        Function = getattr(__import__(self.py_file), self.py_file)
    # ---------------------------------------------------------------
    # -------------- Multiprocessing with Pool Class ----------------
    # ---------------------------------------------------------------
    # Start a pool with the number of CPUs
    if self.n_cpus is None:
        n_cpus = multiprocessing.cpu_count()
    else:
        n_cpus = self.n_cpus

    # Run forward model either normal or with multiprocessing
    if not self.multi_process:
        group_results = list([self.run_forwardmodel((c_points,
                                                     prevRun_No,
                                                     key_str))])
    else:
        with multiprocessing.Pool(n_cpus) as p:
            desc = f'Running forward model {key_str}'
            if self.link_type.lower() == 'function':
                imap_var = p.imap(Function, c_points[:, np.newaxis])
            else:
                args = zip(c_points,
                           [prevRun_No+i for i in range(n_c_points)],
                           [key_str]*n_c_points)
                imap_var = p.imap(self.run_forwardmodel, args)

            group_results = list(tqdm.tqdm(imap_var, total=n_c_points,
                                           desc=desc))

    # Save time steps or x-values
    x_values = group_results[0][0]
    all_outputs["x_values"] = x_values
    if not hdf5_exist:
        if type(x_values) is dict:
            grp_x_values = file.create_group("x_values/")
            for varIdx, var in enumerate(self.Output.names):
                grp_x_values.create_dataset(var, data=x_values[var])
        else:
            file.create_dataset("x_values", data=x_values)

    # save each output in their corresponding array
    NaN_idx = []
    for varIdx, var in enumerate(self.Output.names):

        if not hdf5_exist:
            grpY = file.create_group("EDY/"+var)
        else:
            grpY = file.get("EDY/"+var)

        Outputs = np.asarray([item[varIdx+1] for item in group_results],
                             dtype=np.float64)

        if prevRun_No == 0 and key_str == '':
            grpY.create_dataset(f'init_{key_str}', data=Outputs)
        else:
            try:
                oldEDY = np.array(file[f'EDY/{var}/adaptive_{key_str}'])
                del file[f'EDY/{var}/adaptive_{key_str}']
                data = np.vstack((oldEDY, Outputs))
            except KeyError:
                data = Outputs
            grpY.create_dataset('adaptive_'+key_str, data=data)

        NaN_idx = np.unique(np.argwhere(np.isnan(Outputs))[:, 0])
        all_outputs[var] = np.delete(Outputs, NaN_idx, axis=0)

        if prevRun_No == 0 and key_str == '':
            grpY.create_dataset(f"New_init_{key_str}",
                                data=all_outputs[var])
        else:
            try:
                name = f'EDY/{var}/New_adaptive_{key_str}'
                oldEDY = np.array(file[name])
                del file[f'EDY/{var}/New_adaptive_{key_str}']
                data = np.vstack((oldEDY, all_outputs[var]))
            except KeyError:
                data = all_outputs[var]
            grpY.create_dataset(f'New_adaptive_{key_str}', data=data)

    # Print the collocation points whose simulations crashed
    if len(NaN_idx) != 0:
        print('\n')
        print('*'*20)
        print("\nThe following parametersets have been removed:\n",
              c_points[NaN_idx])
        print("\n")
        print('*'*20)

    # Pass it to the attribute
    new_c_points = np.delete(c_points, NaN_idx, axis=0)
    self.OutputMatrix = all_outputs

    # Save CollocationPoints
    grpX = file.create_group("EDX") if not hdf5_exist else file.get("EDX")
    if prevRun_No == 0 and key_str == '':
        grpX.create_dataset("init_"+key_str, data=c_points)
        if len(NaN_idx) != 0:
            grpX.create_dataset("New_init_"+key_str, data=new_c_points)

    else:
        try:
            name = f'EDX/adaptive_{key_str}'
            oldCollocationPoints = np.array(file[name])
            del file[f'EDX/adaptive_{key_str}']
            data = np.vstack((oldCollocationPoints, new_c_points))
        except KeyError:
            data = new_c_points
        grpX.create_dataset('adaptive_'+key_str, data=data)

        if len(NaN_idx) != 0:
            try:
                name = f'EDX/New_adaptive_{key_str}'
                oldCollocationPoints = np.array(file[name])
                del file[f'EDX/New_adaptive_{key_str}']
                data = np.vstack((oldCollocationPoints, new_c_points))
            except KeyError:
                data = new_c_points
            grpX.create_dataset('New_adaptive_'+key_str, data=data)

    # Close h5py file
    file.close()

    return all_outputs, new_c_points
def update_input_params(self, new_input_file, param_sets)

Finds this pattern with in the new_input_file and replace it with the new value from the array param_sets.

Parameters

new_input_file : TYPE
DESCRIPTION.
param_sets : TYPE
DESCRIPTION.

Returns

None.

Expand source code
def update_input_params(self, new_input_file, param_sets):
    """
    Finds this pattern with <X1> in the new_input_file and replace it with
     the new value from the array param_sets.

    Parameters
    ----------
    new_input_file : TYPE
        DESCRIPTION.
    param_sets : TYPE
        DESCRIPTION.

    Returns
    -------
    None.

    """
    NofPa = param_sets.shape[0]
    text_to_search_list = [f'<X{i+1}>' for i in range(NofPa)]

    for filename in new_input_file:
        # Read in the file
        with open(filename, 'r') as file:
            filedata = file.read()

        # Replace the target string
        for text_to_search, params in zip(text_to_search_list, param_sets):
            filedata = filedata.replace(text_to_search, f'{params:0.4e}')

        # Write the file out again
        with open(filename, 'w') as file:
            file.write(filedata)
def within_range(self, out, minout, maxout)
Expand source code
def within_range(self, out, minout, maxout):
    inside = False
    if (out > minout).all() and (out < maxout).all():
        inside = True
    return inside
def zip_subdirs(self, dir_name, key)

Zips all the files containing the key(word).

Parameters

dir_name : string
Directory name.
key : string
Keyword to search for.

Returns

None.

Expand source code
def zip_subdirs(self, dir_name, key):
    """
    Zips all the files containing the key(word).

    Parameters
    ----------
    dir_name : string
        Directory name.
    key : string
        Keyword to search for.

    Returns
    -------
    None.

    """
    # setup file paths variable
    dir_list = []
    file_paths = []

    # Read all directory, subdirectories and file lists
    dir_path = os.getcwd()

    for root, directories, files in os.walk(dir_path):
        for directory in directories:
            # Create the full filepath by using os module.
            if key in directory:
                folderPath = os.path.join(dir_path, directory)
                dir_list.append(folderPath)

    # Loop over the identified directories to store the file paths
    for direct_name in dir_list:
        for root, directories, files in os.walk(direct_name):
            for filename in files:
                # Create the full filepath by using os module.
                filePath = os.path.join(root, filename)
                file_paths.append('.'+filePath.split(dir_path)[1])

    # writing files to a zipfile
    if len(file_paths) != 0:
        zip_file = zipfile.ZipFile(dir_name+'.zip', 'w')
        with zip_file:
            # writing each file one by one
            for file in file_paths:
                zip_file.write(file)

        file_paths = [path for path in os.listdir('.') if key in path]

        for path in file_paths:
            shutil.rmtree(path)

        print("\n")
        print(f'{dir_name}.zip file has been created successfully!\n')

    return