Module bayesvalidrox.surrogate_models
Expand source code
# -*- coding: utf-8 -*-
from .surrogate_models import MetaModel
__all__ = [
"MetaModel"
]
Sub-modules
bayesvalidrox.surrogate_models.apoly_construction
bayesvalidrox.surrogate_models.bayes_linear
bayesvalidrox.surrogate_models.eval_rec_rule
-
Based on the implementation in UQLab [1] …
bayesvalidrox.surrogate_models.exp_designs
bayesvalidrox.surrogate_models.exploration
bayesvalidrox.surrogate_models.glexindex
-
Multi indices for monomial exponents. Credit: Jonathan Feinberg https://github.com/jonathf/numpoly/blob/master/numpoly/utils/glexindex.py
bayesvalidrox.surrogate_models.inputs
bayesvalidrox.surrogate_models.reg_fast_ard
-
Created on Tue Mar 24 19:41:45 2020 …
bayesvalidrox.surrogate_models.reg_fast_laplace
bayesvalidrox.surrogate_models.sequential_design
-
Created on Fri Jan 28 09:21:18 2022 …
bayesvalidrox.surrogate_models.surrogate_models
Classes
class MetaModel (input_obj, meta_model_type='PCE', pce_reg_method='OLS', pce_deg=1, pce_q_norm=1.0, dim_red_method='no', verbose=False)
-
Meta (surrogate) model
This class trains a surrogate model. It accepts an input object (input_obj) containing the specification of the distributions for uncertain parameters and a model object with instructions on how to run the computational model.
Attributes
input_obj
:obj
- Input object with the information on the model input parameters.
meta_model_type
:str
- Surrogate model types. Three surrogate model types are supported:
polynomial chaos expansion (
PCE
), arbitrary PCE (aPCE
) and Gaussian process regression (GPE
). Default is PCE. pce_reg_method
:str
-
PCE regression method to compute the coefficients. The following regression methods are available:
- OLS: Ordinary Least Square method
- BRR: Bayesian Ridge Regression
- LARS: Least angle regression
- ARD: Bayesian ARD Regression
- FastARD: Fast Bayesian ARD Regression
- VBL: Variational Bayesian Learning
- EBL: Emperical Bayesian Learning
Default is
OLS
.
pce_deg
:int
orlist
ofint
- Polynomial degree(s). If a list is given, an adaptive algorithm is used
to find the best degree with the lowest Leave-One-Out cross-validation
(LOO) error (or the highest score=1-LOO). Default is
1
. pce_q_norm
:float
- Hyperbolic (or q-norm) truncation for multi-indices of multivariate
polynomials. Default is
1.0
. dim_red_method
:str
- Dimensionality reduction method for the output space. The available
method is based on principal component analysis (PCA). The Default is
'no'
. There are two ways to select number of components: use percentage of the explainable variance threshold (between 0 and 100) (Option A) or direct prescription of components' number (Option B):>>> MetaModelOpts.dim_red_method = 'PCA' >>> MetaModelOpts.var_pca_threshold = 99.999 # Option A >>> MetaModelOpts.n_pca_components = 12 # Option B
verbose
:bool
- Prints summary of the regression results. Default is
False
.
Note
To define the sampling methods and the training set, an experimental design instance shall be defined. This can be done by:
>>> MetaModelOpts.add_ExpDesign()
Two experimental design schemes are supported: one-shot (
normal
) and adaptive sequential (sequential
) designs. For experimental design refer toExpDesigns
.Expand source code
class MetaModel: """ Meta (surrogate) model This class trains a surrogate model. It accepts an input object (input_obj) containing the specification of the distributions for uncertain parameters and a model object with instructions on how to run the computational model. Attributes ---------- input_obj : obj Input object with the information on the model input parameters. meta_model_type : str Surrogate model types. Three surrogate model types are supported: polynomial chaos expansion (`PCE`), arbitrary PCE (`aPCE`) and Gaussian process regression (`GPE`). Default is PCE. pce_reg_method : str PCE regression method to compute the coefficients. The following regression methods are available: 1. OLS: Ordinary Least Square method 2. BRR: Bayesian Ridge Regression 3. LARS: Least angle regression 4. ARD: Bayesian ARD Regression 5. FastARD: Fast Bayesian ARD Regression 6. VBL: Variational Bayesian Learning 7. EBL: Emperical Bayesian Learning Default is `OLS`. pce_deg : int or list of int Polynomial degree(s). If a list is given, an adaptive algorithm is used to find the best degree with the lowest Leave-One-Out cross-validation (LOO) error (or the highest score=1-LOO). Default is `1`. pce_q_norm : float Hyperbolic (or q-norm) truncation for multi-indices of multivariate polynomials. Default is `1.0`. dim_red_method : str Dimensionality reduction method for the output space. The available method is based on principal component analysis (PCA). The Default is `'no'`. There are two ways to select number of components: use percentage of the explainable variance threshold (between 0 and 100) (Option A) or direct prescription of components' number (Option B): >>> MetaModelOpts.dim_red_method = 'PCA' >>> MetaModelOpts.var_pca_threshold = 99.999 # Option A >>> MetaModelOpts.n_pca_components = 12 # Option B verbose : bool Prints summary of the regression results. Default is `False`. Note ------- To define the sampling methods and the training set, an experimental design instance shall be defined. This can be done by: >>> MetaModelOpts.add_ExpDesign() Two experimental design schemes are supported: one-shot (`normal`) and adaptive sequential (`sequential`) designs. For experimental design refer to `ExpDesigns`. """ def __init__(self, input_obj, meta_model_type='PCE', pce_reg_method='OLS', pce_deg=1, pce_q_norm=1.0, dim_red_method='no', verbose=False): self.input_obj = input_obj self.meta_model_type = meta_model_type self.pce_reg_method = pce_reg_method self.pce_deg = pce_deg self.pce_q_norm = pce_q_norm self.dim_red_method = dim_red_method self.verbose = False # ------------------------------------------------------------------------- def create_metamodel(self, Model): """ Starts the training of the meta-model for the model objects containg the given computational model. Parameters ---------- Model : obj Model object. Returns ------- metamodel : obj The meta model object. """ self.ModelObj = Model self.n_params = len(self.input_obj.Marginals) self.ExpDesignFlag = 'normal' # --- Prepare pce degree --- if self.meta_model_type.lower() == 'pce': if type(self.pce_deg) is not np.ndarray: self.pce_deg = np.array(self.pce_deg) if self.ExpDesign.method == 'sequential': from .sequential_design import SeqDesign seq_design = SeqDesign(self) metamodel = seq_design.train_seq_design(Model) elif self.ExpDesign.method == 'normal': self.ExpDesignFlag = 'normal' metamodel = self.train_norm_design(Model) else: raise Exception("The method for experimental design you requested" " has not been implemented yet.") # Zip the model run directories if self.ModelObj.link_type.lower() == 'pylink': Model.zip_subdirs(Model.name, f'{Model.name}_') return metamodel # ------------------------------------------------------------------------- def train_norm_design(self, Model, verbose=False): """ This function loops over the outputs and each time step/point and fits the meta model. Parameters ---------- Model : obj Model object. verbose : bool, optional Flag for a sequential design in silent mode. The default is False. Returns ------- self: obj Meta-model object. """ # Get the collocation points to run the forward model CollocationPoints, OutputDict = self.generate_ExpDesign(Model) # Initialize the nested dictionaries self.deg_dict = self.auto_vivification() self.q_norm_dict = self.auto_vivification() self.coeffs_dict = self.auto_vivification() self.basis_dict = self.auto_vivification() self.score_dict = self.auto_vivification() self.clf_poly = self.auto_vivification() self.gp_poly = self.auto_vivification() self.pca = self.auto_vivification() self.LCerror = self.auto_vivification() self.x_scaler = {} # Define the DegreeArray nSamples, ndim = CollocationPoints.shape self.DegreeArray = self.__select_degree(ndim, nSamples) # Generate all basis indices self.allBasisIndices = self.auto_vivification() for deg in self.DegreeArray: keys = self.allBasisIndices.keys() if deg not in np.fromiter(keys, dtype=float): # Generate the polynomial basis indices for qidx, q in enumerate(self.pce_q_norm): basis_indices = self.create_basis_indices(degree=deg, q_norm=q) self.allBasisIndices[str(deg)][str(q)] = basis_indices # Evaluate the univariate polynomials on ExpDesign if self.meta_model_type.lower() != 'gpe': self.univ_p_val = self.univ_basis_vals(CollocationPoints) if 'x_values' in OutputDict: self.ExpDesign.x_values = OutputDict['x_values'] del OutputDict['x_values'] # --- Loop through data points and fit the surrogate --- if not verbose: print(f"\n>>>> Training the {self.meta_model_type} metamodel " "started. <<<<<<\n") items = tqdm(OutputDict.items(), desc="Fitting regression") else: items = OutputDict.items() # For loop over the components/outputs for key, Output in items: # Dimensionality reduction with PCA, if specified if self.dim_red_method.lower() == 'pca': self.pca[key], target = self.pca_transformation(Output) else: target = Output # Parallel fit regression if self.meta_model_type.lower() == 'gpe': # Prepare the input matrix scaler = MinMaxScaler() X_S = scaler.fit_transform(CollocationPoints) self.x_scaler[key] = scaler out = Parallel(n_jobs=-1, prefer='threads')( delayed(self.gaussian_process_emulator)(X_S, target[:, idx]) for idx in range(target.shape[1])) for idx in range(target.shape[1]): self.gp_poly[key][f"y_{idx+1}"] = out[idx] else: out = Parallel(n_jobs=-1, prefer='threads')( delayed(self.adaptive_regression)(CollocationPoints, target[:, idx], idx) for idx in range(target.shape[1])) for i in range(target.shape[1]): # Create a dict to pass the variables self.deg_dict[key][f"y_{i+1}"] = out[i]['degree'] self.q_norm_dict[key][f"y_{i+1}"] = out[i]['qnorm'] self.coeffs_dict[key][f"y_{i+1}"] = out[i]['coeffs'] self.basis_dict[key][f"y_{i+1}"] = out[i]['multi_indices'] self.score_dict[key][f"y_{i+1}"] = out[i]['LOOCVScore'] self.clf_poly[key][f"y_{i+1}"] = out[i]['clf_poly'] self.LCerror[key][f"y_{i+1}"] = out[i]['LCerror'] if not verbose: print(f"\n>>>> Training the {self.meta_model_type} metamodel" " sucessfully completed. <<<<<<\n") return self # ------------------------------------------------------------------------- def create_basis_indices(self, degree, q_norm): """ Creates set of selected multi-indices of multivariate polynomials for certain parameter numbers, polynomial degree, hyperbolic (or q-norm) truncation scheme. Parameters ---------- degree : int Polynomial degree. q_norm : float hyperbolic (or q-norm) truncation. Returns ------- basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. """ basis_indices = glexindex(start=0, stop=degree+1, dimensions=self.n_params, cross_truncation=q_norm, reverse=False, graded=True) return basis_indices # ------------------------------------------------------------------------- def add_ExpDesign(self): """ Instanciates experimental design object. Returns ------- None. """ self.ExpDesign = ExpDesigns(self.input_obj, meta_Model=self.meta_model_type) # ------------------------------------------------------------------------- def generate_ExpDesign(self, Model): """ Prepares the experimental design either by reading from the prescribed data or running simulations. Parameters ---------- Model : obj Model object. Raises ------ Exception If model sumulations are not provided properly. Returns ------- ED_X_tr: array of shape (n_samples, n_params) Training samples transformed by an isoprobabilistic transformation. ED_Y: dict Model simulations (target) for all outputs. """ ExpDesign = self.ExpDesign if self.ExpDesignFlag != 'sequential': # Read ExpDesign (training and targets) from the provided hdf5 if ExpDesign.hdf5_file is not None: # Read hdf5 file f = h5py.File(ExpDesign.hdf5_file, 'r+') # Read EDX and pass it to ExpDesign object try: ExpDesign.X = np.array(f["EDX/New_init_"]) except KeyError: ExpDesign.X = np.array(f["EDX/init_"]) # Update number of initial samples ExpDesign.n_init_samples = ExpDesign.X.shape[0] # Read EDX and pass it to ExpDesign object out_names = self.ModelObj.Output.names ExpDesign.Y = {} # Extract x values try: ExpDesign.Y["x_values"] = dict() for varIdx, var in enumerate(out_names): x = np.array(f[f"x_values/{var}"]) ExpDesign.Y["x_values"][var] = x except KeyError: ExpDesign.Y["x_values"] = np.array(f["x_values"]) # Store the output for varIdx, var in enumerate(out_names): try: y = np.array(f[f"EDY/{var}/New_init_"]) except KeyError: y = np.array(f[f"EDY/{var}/init_"]) ExpDesign.Y[var] = y f.close() else: # Check if an old hdf5 file exists: if yes, rename it hdf5file = f'ExpDesign_{self.ModelObj.name}.hdf5' if os.path.exists(hdf5file): os.rename(hdf5file, 'old_'+hdf5file) # ---- Prepare X samples ---- ED_X, ED_X_tr = ExpDesign.generate_ED(ExpDesign.n_init_samples, ExpDesign.sampling_method, transform=True, max_pce_deg=np.max(self.pce_deg)) ExpDesign.X = ED_X ExpDesign.collocationPoints = ED_X_tr self.bound_tuples = ExpDesign.bound_tuples # ---- Run simulations at X ---- if not hasattr(ExpDesign, 'Y') or ExpDesign.Y is None: print('\n Now the forward model needs to be run!\n') ED_Y, up_ED_X = Model.run_model_parallel(ED_X) ExpDesign.X = up_ED_X self.ModelOutputDict = ED_Y ExpDesign.Y = ED_Y else: # Check if a dict has been passed. if type(ExpDesign.Y) is dict: self.ModelOutputDict = ExpDesign.Y else: raise Exception('Please provide either a dictionary or a hdf5' 'file to ExpDesign.hdf5_file argument.') return ED_X_tr, self.ModelOutputDict # ------------------------------------------------------------------------- def univ_basis_vals(self, samples, n_max=None): """ Evaluates univariate regressors along input directions. Parameters ---------- samples : array of shape (n_samples, n_params) Samples. n_max : int, optional Maximum polynomial degree. The default is `None`. Returns ------- univ_basis: array of shape (n_samples, n_params, n_max+1) All univariate regressors up to n_max. """ # Extract information poly_types = self.ExpDesign.poly_types if samples.ndim != 2: samples = samples.reshape(1, len(samples)) n_max = np.max(self.pce_deg) if n_max is None else n_max # Extract poly coeffs if self.ExpDesign.input_data_given or self.ExpDesign.apce: apolycoeffs = self.ExpDesign.polycoeffs else: apolycoeffs = None # Evaluate univariate basis univ_basis = eval_univ_basis(samples, n_max, poly_types, apolycoeffs) return univ_basis # ------------------------------------------------------------------------- def create_psi(self, basis_indices, univ_p_val): """ This function assemble the design matrix Psi from the given basis index set INDICES and the univariate polynomial evaluations univ_p_val. Parameters ---------- basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. univ_p_val : array of (n_samples, n_params, n_max+1) All univariate regressors up to `n_max`. Raises ------ ValueError n_terms in arguments do not match. Returns ------- psi : array of shape (n_samples, n_terms) Multivariate regressors. """ # Check if BasisIndices is a sparse matrix sparsity = sp.sparse.issparse(basis_indices) if sparsity: basis_indices = basis_indices.toarray() # Initialization and consistency checks # number of input variables n_params = univ_p_val.shape[1] # Size of the experimental design n_samples = univ_p_val.shape[0] # number of basis terms n_terms = basis_indices.shape[0] # check that the variables have consistent sizes if n_params != basis_indices.shape[1]: raise ValueError("The shapes of basis_indices and univ_p_val don't" " match!!") # Preallocate the Psi matrix for performance psi = np.ones((n_samples, n_terms)) # Assemble the Psi matrix for m in range(basis_indices.shape[1]): aa = np.where(basis_indices[:, m] > 0)[0] try: basisIdx = basis_indices[aa, m] bb = np.reshape(univ_p_val[:, m, basisIdx], psi[:, aa].shape) psi[:, aa] = np.multiply(psi[:, aa], bb) except ValueError as err: raise err return psi # ------------------------------------------------------------------------- def fit(self, X, y, basis_indices, reg_method=None): """ Fit regression using the regression method provided. Parameters ---------- X : array of shape (n_samples, n_features) Training vector, where n_samples is the number of samples and n_features is the number of features. y : array of shape (n_samples,) Target values. basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. reg_method : str, optional DESCRIPTION. The default is None. Returns ------- return_out_dict : Dict Fitted estimator, spareMulti-Index, sparseX and coefficients. """ if reg_method is None: reg_method = self.pce_reg_method # Check if BasisIndices is a sparse matrix sparsity = sp.sparse.issparse(basis_indices) clf_poly = [] compute_score = True if self.verbose else False # inverse of the observed variance of the data if np.var(y) != 0: Lambda = 1 / np.var(y) else: Lambda = 1e-6 # Bayes sparse adaptive aPCE if reg_method.lower() != 'ols': if reg_method.lower() == 'brr' or np.var(y) == 0: clf_poly = lm.BayesianRidge(n_iter=1000, tol=1e-7, fit_intercept=True, normalize=True, compute_score=compute_score, alpha_1=1e-04, alpha_2=1e-04, lambda_1=Lambda, lambda_2=Lambda) clf_poly.converged = True elif reg_method.lower() == 'ard': clf_poly = lm.ARDRegression(fit_intercept=True, normalize=True, compute_score=compute_score, n_iter=1000, tol=0.0001, alpha_1=1e-3, alpha_2=1e-3, lambda_1=Lambda, lambda_2=Lambda) elif reg_method.lower() == 'fastard': clf_poly = RegressionFastARD(fit_intercept=True, normalize=True, compute_score=compute_score, n_iter=300, tol=1e-10) elif reg_method.lower() == 'bcs': clf_poly = RegressionFastLaplace(fit_intercept=False, n_iter=1000, tol=1e-7) elif reg_method.lower() == 'lars': clf_poly = lm.LassoLarsCV(fit_intercept=False) elif reg_method.lower() == 'sgdr': clf_poly = lm.SGDRegressor(fit_intercept=False, max_iter=5000, tol=1e-7) elif reg_method.lower() == 'omp': clf_poly = lm.OrthogonalMatchingPursuitCV(fit_intercept=False, max_iter=10) elif reg_method.lower() == 'vbl': clf_poly = VBLinearRegression(fit_intercept=False) elif reg_method.lower() == 'ebl': clf_poly = EBLinearRegression(optimizer='em') # Fit clf_poly.fit(X, y) # Select the nonzero entries of coefficients # The first column must be kept (For mean calculations) nnz_idx = np.nonzero(clf_poly.coef_)[0] if len(nnz_idx) == 0 or nnz_idx[0] != 0: nnz_idx = np.insert(np.nonzero(clf_poly.coef_)[0], 0, 0) # Remove the zero entries for Bases and PSI if need be if sparsity: sparse_basis_indices = basis_indices.toarray()[nnz_idx] else: sparse_basis_indices = basis_indices[nnz_idx] sparse_X = X[:, nnz_idx] # Store the coefficients of the regression model clf_poly.fit(sparse_X, y) coeffs = clf_poly.coef_ else: # This is for the case where all outputs are zero, thereby # all coefficients are zero if sparsity: sparse_basis_indices = basis_indices.toarray() else: sparse_basis_indices = basis_indices sparse_X = X coeffs = clf_poly.coef_ # Ordinary least square method (OLS) else: if sparsity: sparse_basis_indices = basis_indices.toarray() else: sparse_basis_indices = basis_indices sparse_X = X X_T_X = np.dot(sparse_X.T, sparse_X) if np.linalg.cond(X_T_X) > 1e-12 and \ np.linalg.cond(X_T_X) < 1 / sys.float_info.epsilon: # faster coeffs = sp.linalg.solve(X_T_X, np.dot(sparse_X.T, y)) else: # stabler coeffs = np.dot(np.dot(np.linalg.pinv(X_T_X), sparse_X.T), y) # Create a dict to pass the outputs return_out_dict = dict() return_out_dict['clf_poly'] = clf_poly return_out_dict['spareMulti-Index'] = sparse_basis_indices return_out_dict['sparePsi'] = sparse_X return_out_dict['coeffs'] = coeffs return return_out_dict # -------------------------------------------------------------------------------------------------------- def adaptive_regression(self, ED_X, ED_Y, varIdx, verbose=False): """ Adaptively fits the PCE model by comparing the scores of different degrees and q-norm. Parameters ---------- ED_X : array of shape (n_samples, n_params) Experimental design. ED_Y : array of shape (n_samples,) Target values, i.e. simulation results for the Experimental design. varIdx : int Index of the output. verbose : bool, optional Print out summary. The default is False. Returns ------- returnVars : Dict Fitted estimator, best degree, best q-norm, LOOCVScore and coefficients. """ NrSamples, n_params = ED_X.shape # Initialization qAllCoeffs, AllCoeffs = {}, {} qAllIndices_Sparse, AllIndices_Sparse = {}, {} qAllclf_poly, Allclf_poly = {}, {} qAllnTerms, AllnTerms = {}, {} qAllLCerror, AllLCerror = {}, {} # Extract degree array and qnorm array DegreeArray = np.array([*self.allBasisIndices], dtype=int) qnorm = [*self.allBasisIndices[str(int(DegreeArray[0]))]] # Some options for EarlyStop errorIncreases = False # Stop degree, if LOO error does not decrease n_checks_degree times n_checks_degree = 3 # Stop qNorm, if criterion isn't fulfilled n_checks_qNorm times n_checks_qNorm = 2 nqnorms = len(qnorm) qNormEarlyStop = True if nqnorms < n_checks_qNorm+1: qNormEarlyStop = False # ===================================================================== # basis adaptive polynomial chaos: repeat the calculation by increasing # polynomial degree until the highest accuracy is reached # ===================================================================== # For each degree check all q-norms and choose the best one scores = -np.inf * np.ones(DegreeArray.shape[0]) qNormScores = -np.inf * np.ones(nqnorms) for degIdx, deg in enumerate(DegreeArray): for qidx, q in enumerate(qnorm): # Extract the polynomial basis indices from the pool of # allBasisIndices BasisIndices = self.allBasisIndices[str(deg)][str(q)] # Assemble the Psi matrix Psi = self.create_psi(BasisIndices, self.univ_p_val) # Calulate the cofficients of the meta model outs = self.fit(Psi, ED_Y, BasisIndices) # Calculate and save the score of LOOCV score, LCerror = self.corr_loocv_error(outs['clf_poly'], outs['sparePsi'], outs['coeffs'], ED_Y) # Check the convergence of noise for FastARD if self.pce_reg_method == 'FastARD' and \ outs['clf_poly'].alpha_ < np.finfo(np.float32).eps: score = -np.inf qNormScores[qidx] = score qAllCoeffs[str(qidx+1)] = outs['coeffs'] qAllIndices_Sparse[str(qidx+1)] = outs['spareMulti-Index'] qAllclf_poly[str(qidx+1)] = outs['clf_poly'] qAllnTerms[str(qidx+1)] = BasisIndices.shape[0] qAllLCerror[str(qidx+1)] = LCerror # EarlyStop check # if there are at least n_checks_qNorm entries after the # best one, we stop if qNormEarlyStop and \ sum(np.isfinite(qNormScores)) > n_checks_qNorm: # If the error has increased the last two iterations, stop! qNormScores_nonInf = qNormScores[np.isfinite(qNormScores)] deltas = np.sign(np.diff(qNormScores_nonInf)) if sum(deltas[-n_checks_qNorm+1:]) == 2: # stop the q-norm loop here break if np.var(ED_Y) == 0: break # Store the score in the scores list best_q = np.nanargmax(qNormScores) scores[degIdx] = qNormScores[best_q] AllCoeffs[str(degIdx+1)] = qAllCoeffs[str(best_q+1)] AllIndices_Sparse[str(degIdx+1)] = qAllIndices_Sparse[str(best_q+1)] Allclf_poly[str(degIdx+1)] = qAllclf_poly[str(best_q+1)] AllnTerms[str(degIdx+1)] = qAllnTerms[str(best_q+1)] AllLCerror[str(degIdx+1)] = qAllLCerror[str(best_q+1)] # Check the direction of the error (on average): # if it increases consistently stop the iterations if len(scores[scores != -np.inf]) > n_checks_degree: scores_nonInf = scores[scores != -np.inf] ss = np.sign(scores_nonInf - np.max(scores_nonInf)) # ss<0 error decreasing errorIncreases = np.sum(np.sum(ss[-2:])) <= -1*n_checks_degree if errorIncreases: break # Check only one degree, if target matrix has zero variance if np.var(ED_Y) == 0: break # ------------------ Summary of results ------------------ # Select the one with the best score and save the necessary outputs best_deg = np.nanargmax(scores)+1 coeffs = AllCoeffs[str(best_deg)] basis_indices = AllIndices_Sparse[str(best_deg)] clf_poly = Allclf_poly[str(best_deg)] LOOCVScore = np.nanmax(scores) P = AllnTerms[str(best_deg)] LCerror = AllLCerror[str(best_deg)] degree = DegreeArray[np.nanargmax(scores)] qnorm = float(qnorm[best_q]) # ------------------ Print out Summary of results ------------------ if self.verbose: # Create PSI_Sparse by removing redundent terms nnz_idx = np.nonzero(coeffs)[0] BasisIndices_Sparse = basis_indices[nnz_idx] print(f'Output variable {varIdx+1}:') print('The estimation of PCE coefficients converged at polynomial ' f'degree {DegreeArray[best_deg-1]} with ' f'{len(BasisIndices_Sparse)} terms (Sparsity index = ' f'{round(len(BasisIndices_Sparse)/P, 3)}).') print(f'Final ModLOO error estimate: {1-max(scores):.3e}') print('\n'+'-'*50) if verbose: print('='*50) print(' '*10 + ' Summary of results ') print('='*50) print("scores:\n", scores) print("Best score's degree:", self.DegreeArray[best_deg-1]) print("NO. of terms:", len(basis_indices)) print("Sparsity index:", round(len(basis_indices)/P, 3)) print("Best Indices:\n", basis_indices) if self.pce_reg_method in ['BRR', 'ARD']: fig, ax = plt.subplots(figsize=(12, 10)) plt.title("Marginal log-likelihood") plt.plot(clf_poly.scores_, color='navy', linewidth=2) plt.ylabel("Score") plt.xlabel("Iterations") if self.pce_reg_method.lower() == 'bbr': text = f"$\\alpha={clf_poly.alpha_:.1f}$\n" f"$\\lambda={clf_poly.lambda_:.3f}$\n" f"$L={clf_poly.scores_[-1]:.1f}$" else: text = f"$\\alpha={clf_poly.alpha_:.1f}$\n$" f"\\L={clf_poly.scores_[-1]:.1f}$" plt.text(0.75, 0.5, text, fontsize=18, transform=ax.transAxes) plt.show() print('='*80) # Create a dict to pass the outputs returnVars = dict() returnVars['clf_poly'] = clf_poly returnVars['degree'] = degree returnVars['qnorm'] = qnorm returnVars['coeffs'] = coeffs returnVars['multi_indices'] = basis_indices returnVars['LOOCVScore'] = LOOCVScore returnVars['LCerror'] = LCerror return returnVars # ------------------------------------------------------------------------- def corr_loocv_error(self, clf, psi, coeffs, y): """ Calculates the corrected LOO error for regression on regressor matrix `psi` that generated the coefficients based on [1] and [2]. [1] Blatman, G., 2009. Adaptive sparse polynomial chaos expansions for uncertainty propagation and sensitivity analysis (Doctoral dissertation, Clermont-Ferrand 2). [2] Blatman, G. and Sudret, B., 2011. Adaptive sparse polynomial chaos expansion based on least angle regression. Journal of computational Physics, 230(6), pp.2345-2367. Parameters ---------- clf : object Fitted estimator. psi : array of shape (n_samples, n_features) The multivariate orthogonal polynomials (regressor). coeffs : array-like of shape (n_features,) Estimated cofficients. y : array of shape (n_samples,) Target values. Returns ------- Q_2 : float LOOCV Validation score (1-LOOCV erro). residual : array of shape (n_samples,) Residual values (y - predicted targets). """ psi = np.array(psi, dtype=float) # Create PSI_Sparse by removing redundent terms nnz_idx = np.nonzero(coeffs)[0] if len(nnz_idx) == 0: nnz_idx = [0] psi_sparse = psi[:, nnz_idx] # NrCoeffs of aPCEs P = len(nnz_idx) # NrEvaluation (Size of experimental design) N = psi.shape[0] # Build the projection matrix PsiTPsi = np.dot(psi_sparse.T, psi_sparse) if np.linalg.cond(PsiTPsi) > 1e-12 and \ np.linalg.cond(PsiTPsi) < 1/sys.float_info.epsilon: # faster M = sp.linalg.solve(PsiTPsi, sp.sparse.eye(PsiTPsi.shape[0]).toarray()) else: # stabler M = np.linalg.pinv(PsiTPsi) # h factor (the full matrix is not calculated explicitly, # only the trace is, to save memory) PsiM = np.dot(psi_sparse, M) h = np.sum(np.multiply(PsiM, psi_sparse), axis=1, dtype=np.float128) # ------ Calculate Error Loocv for each measurement point ---- # Residuals if isinstance(clf, list): residual = np.dot(psi, coeffs) - y else: residual = clf.predict(psi) - y # Variance varY = np.var(y) if varY == 0: normEmpErr = 0 ErrLoo = 0 LCerror = np.zeros((y.shape)) else: normEmpErr = np.mean(residual**2)/varY # LCerror = np.divide(residual, (1-h)) LCerror = residual / (1-h)[:, np.newaxis] ErrLoo = np.mean(np.square(LCerror)) / varY # if there are NaNs, just return an infinite LOO error (this # happens, e.g., when a strongly underdetermined problem is solved) if np.isnan(ErrLoo): ErrLoo = np.inf # Corrected Error for over-determined system trM = np.trace(M) if trM < 0 or abs(trM) > 1e6: trM = np.trace(np.linalg.pinv(np.dot(psi.T, psi))) # Over-determined system of Equation if N > P: T_factor = N/(N-P) * (1 + trM) # Under-determined system of Equation else: T_factor = np.inf CorrectedErrLoo = ErrLoo * T_factor Q_2 = 1 - CorrectedErrLoo return Q_2, residual # ------------------------------------------------------------------------- def pca_transformation(self, Output): """ Transforms the targets (outputs) via Principal Component Analysis Parameters ---------- Output : array of shape (n_samples,) Target values. Returns ------- pca : obj Fitted sklearnPCA object. OutputMatrix : array of shape (n_samples,) Transformed target values. """ # Transform via Principal Component Analysis if hasattr(self, 'var_pca_threshold'): var_pca_threshold = self.var_pca_threshold else: var_pca_threshold = 100.0 n_samples, n_features = Output.shape if hasattr(self, 'n_pca_components'): n_pca_components = self.n_pca_components else: # Instantiate and fit sklearnPCA object covar_matrix = sklearnPCA(n_components=None) covar_matrix.fit(Output) var = np.cumsum(np.round(covar_matrix.explained_variance_ratio_, decimals=5)*100) # Find the number of components to explain self.varPCAThreshold of # variance try: n_components = np.where(var >= var_pca_threshold)[0][0] + 1 except IndexError: n_components = min(n_samples, n_features) n_pca_components = min(n_samples, n_features, n_components) # Print out a report print() print('-' * 50) print(f"PCA transformation is performed with {n_pca_components}" " components.") print('-' * 50) print() # Fit and transform with the selected number of components pca = sklearnPCA(n_components=n_pca_components, svd_solver='randomized') OutputMatrix = pca.fit_transform(Output) return pca, OutputMatrix # ------------------------------------------------------------------------- def gaussian_process_emulator(self, X, y, nug_term=None, autoSelect=False, varIdx=None): """ Fits a Gaussian Process Emulator to the target given the training points. Parameters ---------- X : array of shape (n_samples, n_params) Training points. y : array of shape (n_samples,) Target values. nug_term : float, optional Nugget term. The default is None, i.e. variance of y. autoSelect : bool, optional Loop over some kernels and select the best. The default is False. varIdx : int, optional The index number. The default is None. Returns ------- gp : object Fitted estimator. """ nug_term = nug_term if nug_term else np.var(y) Kernels = [nug_term * kernels.RBF(length_scale=1.0, length_scale_bounds=(1e-25, 1e15)), nug_term * kernels.RationalQuadratic(length_scale=0.2, alpha=1.0), nug_term * kernels.Matern(length_scale=1.0, length_scale_bounds=(1e-15, 1e5), nu=1.5)] # Automatic selection of the kernel if autoSelect: gp = {} BME = [] for i, kernel in enumerate(Kernels): gp[i] = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=3, normalize_y=False) # Fit to data using Maximum Likelihood Estimation gp[i].fit(X, y) # Store the MLE as BME score BME.append(gp[i].log_marginal_likelihood()) gp = gp[np.argmax(BME)] else: gp = GaussianProcessRegressor(kernel=Kernels[0], n_restarts_optimizer=3, normalize_y=False) gp.fit(X, y) # Compute score if varIdx is not None: Score = gp.score(X, y) print('-'*50) print(f'Output variable {varIdx}:') print('The estimation of GPE coefficients converged,') print(f'with the R^2 score: {Score:.3f}') print('-'*50) return gp # ------------------------------------------------------------------------- def eval_metamodel(self, samples=None, nsamples=None, sampling_method='random', return_samples=False): """ Evaluates meta-model at the requested samples. One can also generate nsamples. Parameters ---------- samples : array of shape (n_samples, n_params), optional Samples to evaluate meta-model at. The default is None. nsamples : int, optional Number of samples to generate, if no `samples` is provided. The default is None. sampling_method : str, optional Type of sampling, if no `samples` is provided. The default is 'random'. return_samples : bool, optional Retun samples, if no `samples` is provided. The default is False. Returns ------- mean_pred : dict Mean of the predictions. std_pred : dict Standard deviatioon of the predictions. """ if self.meta_model_type.lower() == 'gpe': model_dict = self.gp_poly else: model_dict = self.coeffs_dict if samples is None: if nsamples is None: self.n_samples = 100000 else: self.n_samples = nsamples samples = self.ExpDesign.generate_samples(self.n_samples, sampling_method) else: self.samples = samples self.n_samples = len(samples) # Transform samples samples = self.ExpDesign.transform(samples) if self.meta_model_type.lower() != 'gpe': univ_p_val = self.univ_basis_vals(samples, n_max=np.max(self.pce_deg)) mean_pred = {} std_pred = {} # Loop over outputs for ouput, values in model_dict.items(): mean = np.zeros((len(samples), len(values))) std = np.zeros((len(samples), len(values))) idx = 0 for in_key, InIdxValues in values.items(): # Perdiction with GPE if self.meta_model_type.lower() == 'gpe': X_T = self.x_scaler[ouput].transform(samples) gp = self.gp_poly[ouput][in_key] y_mean, y_std = gp.predict(X_T, return_std=True) else: # Perdiction with PCE or pcekriging # Assemble Psi matrix psi = self.create_psi(self.basis_dict[ouput][in_key], univ_p_val) # Perdiction try: # with error bar clf_poly = self.clf_poly[ouput][in_key] y_mean, y_std = clf_poly.predict(psi, return_std=True) except: # without error bar coeffs = self.coeffs_dict[ouput][in_key] y_mean = np.dot(psi, coeffs) y_std = np.zeros_like(y_mean) mean[:, idx] = y_mean std[:, idx] = y_std idx += 1 if self.dim_red_method.lower() == 'pca': PCA = self.pca[ouput] mean_pred[ouput] = PCA.mean_ + np.dot(mean, PCA.components_) std_pred[ouput] = np.sqrt(np.dot(std**2, PCA.components_**2)) else: mean_pred[ouput] = mean std_pred[ouput] = std if return_samples: return mean_pred, std_pred, samples else: return mean_pred, std_pred # ------------------------------------------------------------------------- def create_model_error(self, X, y, name='Calib'): """ Fits a GPE-based model error. Parameters ---------- X : array of shape (n_outputs, n_inputs) Input array. It can contain any forcing inputs or coordinates of extracted data. y : array of shape (n_outputs,) The model response for the MAP parameter set. name : str, optional Calibration or validation. The default is `'Calib'`. Returns ------- self: object Self object. """ Model = self.ModelObj outputNames = Model.Output.Names self.errorRegMethod = 'GPE' self.errorclf_poly = self.auto_vivification() self.errorScale = self.auto_vivification() # Read data MeasuredData = Model.read_observation(case=name) # Fitting GPR based bias model for out in outputNames: nan_idx = ~np.isnan(MeasuredData[out]) # Select data try: data = MeasuredData[out].values[nan_idx] except AttributeError: data = MeasuredData[out][nan_idx] # Prepare the input matrix scaler = MinMaxScaler() delta = data # - y[out][0] BiasInputs = np.hstack((X[out], y[out].reshape(-1, 1))) X_S = scaler.fit_transform(BiasInputs) gp = self.gaussian_process_emulator(X_S, delta) self.errorScale[out]["y_1"] = scaler self.errorclf_poly[out]["y_1"] = gp return self # ------------------------------------------------------------------------- def eval_model_error(self, X, y_pred): """ Evaluates the error model. Parameters ---------- X : array Inputs. y_pred : dict Predictions. Returns ------- mean_pred : dict Mean predition of the GPE-based error model. std_pred : dict standard deviation of the GPE-based error model. """ mean_pred = {} std_pred = {} for Outkey, ValuesDict in self.errorclf_poly.items(): pred_mean = np.zeros_like(y_pred[Outkey]) pred_std = np.zeros_like(y_pred[Outkey]) for Inkey, InIdxValues in ValuesDict.items(): gp = self.errorclf_poly[Outkey][Inkey] scaler = self.errorScale[Outkey][Inkey] # Transform Samples using scaler for j, pred in enumerate(y_pred[Outkey]): BiasInputs = np.hstack((X[Outkey], pred.reshape(-1, 1))) Samples_S = scaler.transform(BiasInputs) y_hat, y_std = gp.predict(Samples_S, return_std=True) pred_mean[j] = y_hat pred_std[j] = y_std # pred_mean[j] += pred mean_pred[Outkey] = pred_mean std_pred[Outkey] = pred_std return mean_pred, std_pred # ------------------------------------------------------------------------- class auto_vivification(dict): """ Implementation of perl's AutoVivification feature. Source: https://stackoverflow.com/a/651879/18082457 """ def __getitem__(self, item): try: return dict.__getitem__(self, item) except KeyError: value = self[item] = type(self)() return value # ------------------------------------------------------------------------- def __select_degree(self, ndim, nSamples): """ Selects degree based on the number of samples and parameters in the sequential design. Parameters ---------- ndim : TYPE DESCRIPTION. nSamples : TYPE DESCRIPTION. Returns ------- TYPE DESCRIPTION. """ # Define the DegreeArray max_deg = np.max(self.pce_deg) min_Deg = np.min(self.pce_deg) nitr = nSamples - self.ExpDesign.n_init_samples # Check q-norm if not np.isscalar(self.pce_q_norm): self.pce_q_norm = np.array(self.pce_q_norm) else: self.pce_q_norm = np.array([self.pce_q_norm]) def M_uptoMax(maxDeg): n_combo = np.zeros(maxDeg) for i, d in enumerate(range(1, maxDeg+1)): n_combo[i] = math.factorial(ndim+d) n_combo[i] /= math.factorial(ndim) * math.factorial(d) return n_combo if self.ExpDesignFlag != 'sequential': degNew = max_deg else: d = nitr if nitr != 0 and self.n_params > 5 else 1 min_index = np.argmin(abs(M_uptoMax(max_deg)-ndim*nSamples*d)) degNew = range(1, max_deg+1)[min_index] if degNew > min_Deg and self.pce_reg_method.lower() != 'fastard': DegreeArray = np.arange(min_Deg, degNew+1) else: DegreeArray = np.array([degNew]) return DegreeArray
Class variables
var auto_vivification
-
Implementation of perl's AutoVivification feature.
Methods
def create_metamodel(self, Model)
-
Starts the training of the meta-model for the model objects containg the given computational model.
Parameters
Model
:obj
- Model object.
Returns
metamodel
:obj
- The meta model object.
Expand source code
def create_metamodel(self, Model): """ Starts the training of the meta-model for the model objects containg the given computational model. Parameters ---------- Model : obj Model object. Returns ------- metamodel : obj The meta model object. """ self.ModelObj = Model self.n_params = len(self.input_obj.Marginals) self.ExpDesignFlag = 'normal' # --- Prepare pce degree --- if self.meta_model_type.lower() == 'pce': if type(self.pce_deg) is not np.ndarray: self.pce_deg = np.array(self.pce_deg) if self.ExpDesign.method == 'sequential': from .sequential_design import SeqDesign seq_design = SeqDesign(self) metamodel = seq_design.train_seq_design(Model) elif self.ExpDesign.method == 'normal': self.ExpDesignFlag = 'normal' metamodel = self.train_norm_design(Model) else: raise Exception("The method for experimental design you requested" " has not been implemented yet.") # Zip the model run directories if self.ModelObj.link_type.lower() == 'pylink': Model.zip_subdirs(Model.name, f'{Model.name}_') return metamodel
def train_norm_design(self, Model, verbose=False)
-
This function loops over the outputs and each time step/point and fits the meta model.
Parameters
Model
:obj
- Model object.
verbose
:bool
, optional- Flag for a sequential design in silent mode. The default is False.
Returns
self
:obj
- Meta-model object.
Expand source code
def train_norm_design(self, Model, verbose=False): """ This function loops over the outputs and each time step/point and fits the meta model. Parameters ---------- Model : obj Model object. verbose : bool, optional Flag for a sequential design in silent mode. The default is False. Returns ------- self: obj Meta-model object. """ # Get the collocation points to run the forward model CollocationPoints, OutputDict = self.generate_ExpDesign(Model) # Initialize the nested dictionaries self.deg_dict = self.auto_vivification() self.q_norm_dict = self.auto_vivification() self.coeffs_dict = self.auto_vivification() self.basis_dict = self.auto_vivification() self.score_dict = self.auto_vivification() self.clf_poly = self.auto_vivification() self.gp_poly = self.auto_vivification() self.pca = self.auto_vivification() self.LCerror = self.auto_vivification() self.x_scaler = {} # Define the DegreeArray nSamples, ndim = CollocationPoints.shape self.DegreeArray = self.__select_degree(ndim, nSamples) # Generate all basis indices self.allBasisIndices = self.auto_vivification() for deg in self.DegreeArray: keys = self.allBasisIndices.keys() if deg not in np.fromiter(keys, dtype=float): # Generate the polynomial basis indices for qidx, q in enumerate(self.pce_q_norm): basis_indices = self.create_basis_indices(degree=deg, q_norm=q) self.allBasisIndices[str(deg)][str(q)] = basis_indices # Evaluate the univariate polynomials on ExpDesign if self.meta_model_type.lower() != 'gpe': self.univ_p_val = self.univ_basis_vals(CollocationPoints) if 'x_values' in OutputDict: self.ExpDesign.x_values = OutputDict['x_values'] del OutputDict['x_values'] # --- Loop through data points and fit the surrogate --- if not verbose: print(f"\n>>>> Training the {self.meta_model_type} metamodel " "started. <<<<<<\n") items = tqdm(OutputDict.items(), desc="Fitting regression") else: items = OutputDict.items() # For loop over the components/outputs for key, Output in items: # Dimensionality reduction with PCA, if specified if self.dim_red_method.lower() == 'pca': self.pca[key], target = self.pca_transformation(Output) else: target = Output # Parallel fit regression if self.meta_model_type.lower() == 'gpe': # Prepare the input matrix scaler = MinMaxScaler() X_S = scaler.fit_transform(CollocationPoints) self.x_scaler[key] = scaler out = Parallel(n_jobs=-1, prefer='threads')( delayed(self.gaussian_process_emulator)(X_S, target[:, idx]) for idx in range(target.shape[1])) for idx in range(target.shape[1]): self.gp_poly[key][f"y_{idx+1}"] = out[idx] else: out = Parallel(n_jobs=-1, prefer='threads')( delayed(self.adaptive_regression)(CollocationPoints, target[:, idx], idx) for idx in range(target.shape[1])) for i in range(target.shape[1]): # Create a dict to pass the variables self.deg_dict[key][f"y_{i+1}"] = out[i]['degree'] self.q_norm_dict[key][f"y_{i+1}"] = out[i]['qnorm'] self.coeffs_dict[key][f"y_{i+1}"] = out[i]['coeffs'] self.basis_dict[key][f"y_{i+1}"] = out[i]['multi_indices'] self.score_dict[key][f"y_{i+1}"] = out[i]['LOOCVScore'] self.clf_poly[key][f"y_{i+1}"] = out[i]['clf_poly'] self.LCerror[key][f"y_{i+1}"] = out[i]['LCerror'] if not verbose: print(f"\n>>>> Training the {self.meta_model_type} metamodel" " sucessfully completed. <<<<<<\n") return self
def create_basis_indices(self, degree, q_norm)
-
Creates set of selected multi-indices of multivariate polynomials for certain parameter numbers, polynomial degree, hyperbolic (or q-norm) truncation scheme.
Parameters
degree
:int
- Polynomial degree.
q_norm
:float
- hyperbolic (or q-norm) truncation.
Returns
basis_indices
:array
ofshape (n_terms, n_params)
- Multi-indices of multivariate polynomials.
Expand source code
def create_basis_indices(self, degree, q_norm): """ Creates set of selected multi-indices of multivariate polynomials for certain parameter numbers, polynomial degree, hyperbolic (or q-norm) truncation scheme. Parameters ---------- degree : int Polynomial degree. q_norm : float hyperbolic (or q-norm) truncation. Returns ------- basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. """ basis_indices = glexindex(start=0, stop=degree+1, dimensions=self.n_params, cross_truncation=q_norm, reverse=False, graded=True) return basis_indices
def add_ExpDesign(self)
-
Instanciates experimental design object.
Returns
None.
Expand source code
def add_ExpDesign(self): """ Instanciates experimental design object. Returns ------- None. """ self.ExpDesign = ExpDesigns(self.input_obj, meta_Model=self.meta_model_type)
def generate_ExpDesign(self, Model)
-
Prepares the experimental design either by reading from the prescribed data or running simulations.
Parameters
Model
:obj
- Model object.
Raises
Exception
- If model sumulations are not provided properly.
Returns
ED_X_tr
:array
ofshape (n_samples, n_params)
- Training samples transformed by an isoprobabilistic transformation.
ED_Y
:dict
- Model simulations (target) for all outputs.
Expand source code
def generate_ExpDesign(self, Model): """ Prepares the experimental design either by reading from the prescribed data or running simulations. Parameters ---------- Model : obj Model object. Raises ------ Exception If model sumulations are not provided properly. Returns ------- ED_X_tr: array of shape (n_samples, n_params) Training samples transformed by an isoprobabilistic transformation. ED_Y: dict Model simulations (target) for all outputs. """ ExpDesign = self.ExpDesign if self.ExpDesignFlag != 'sequential': # Read ExpDesign (training and targets) from the provided hdf5 if ExpDesign.hdf5_file is not None: # Read hdf5 file f = h5py.File(ExpDesign.hdf5_file, 'r+') # Read EDX and pass it to ExpDesign object try: ExpDesign.X = np.array(f["EDX/New_init_"]) except KeyError: ExpDesign.X = np.array(f["EDX/init_"]) # Update number of initial samples ExpDesign.n_init_samples = ExpDesign.X.shape[0] # Read EDX and pass it to ExpDesign object out_names = self.ModelObj.Output.names ExpDesign.Y = {} # Extract x values try: ExpDesign.Y["x_values"] = dict() for varIdx, var in enumerate(out_names): x = np.array(f[f"x_values/{var}"]) ExpDesign.Y["x_values"][var] = x except KeyError: ExpDesign.Y["x_values"] = np.array(f["x_values"]) # Store the output for varIdx, var in enumerate(out_names): try: y = np.array(f[f"EDY/{var}/New_init_"]) except KeyError: y = np.array(f[f"EDY/{var}/init_"]) ExpDesign.Y[var] = y f.close() else: # Check if an old hdf5 file exists: if yes, rename it hdf5file = f'ExpDesign_{self.ModelObj.name}.hdf5' if os.path.exists(hdf5file): os.rename(hdf5file, 'old_'+hdf5file) # ---- Prepare X samples ---- ED_X, ED_X_tr = ExpDesign.generate_ED(ExpDesign.n_init_samples, ExpDesign.sampling_method, transform=True, max_pce_deg=np.max(self.pce_deg)) ExpDesign.X = ED_X ExpDesign.collocationPoints = ED_X_tr self.bound_tuples = ExpDesign.bound_tuples # ---- Run simulations at X ---- if not hasattr(ExpDesign, 'Y') or ExpDesign.Y is None: print('\n Now the forward model needs to be run!\n') ED_Y, up_ED_X = Model.run_model_parallel(ED_X) ExpDesign.X = up_ED_X self.ModelOutputDict = ED_Y ExpDesign.Y = ED_Y else: # Check if a dict has been passed. if type(ExpDesign.Y) is dict: self.ModelOutputDict = ExpDesign.Y else: raise Exception('Please provide either a dictionary or a hdf5' 'file to ExpDesign.hdf5_file argument.') return ED_X_tr, self.ModelOutputDict
def univ_basis_vals(self, samples, n_max=None)
-
Evaluates univariate regressors along input directions.
Parameters
samples
:array
ofshape (n_samples, n_params)
- Samples.
n_max
:int
, optional- Maximum polynomial degree. The default is
None
.
Returns
univ_basis
:array
ofshape (n_samples, n_params, n_max+1)
- All univariate regressors up to n_max.
Expand source code
def univ_basis_vals(self, samples, n_max=None): """ Evaluates univariate regressors along input directions. Parameters ---------- samples : array of shape (n_samples, n_params) Samples. n_max : int, optional Maximum polynomial degree. The default is `None`. Returns ------- univ_basis: array of shape (n_samples, n_params, n_max+1) All univariate regressors up to n_max. """ # Extract information poly_types = self.ExpDesign.poly_types if samples.ndim != 2: samples = samples.reshape(1, len(samples)) n_max = np.max(self.pce_deg) if n_max is None else n_max # Extract poly coeffs if self.ExpDesign.input_data_given or self.ExpDesign.apce: apolycoeffs = self.ExpDesign.polycoeffs else: apolycoeffs = None # Evaluate univariate basis univ_basis = eval_univ_basis(samples, n_max, poly_types, apolycoeffs) return univ_basis
def create_psi(self, basis_indices, univ_p_val)
-
This function assemble the design matrix Psi from the given basis index set INDICES and the univariate polynomial evaluations univ_p_val.
Parameters
basis_indices
:array
ofshape (n_terms, n_params)
- Multi-indices of multivariate polynomials.
univ_p_val
:array
of(n_samples, n_params, n_max+1)
- All univariate regressors up to
n_max
.
Raises
ValueError
- n_terms in arguments do not match.
Returns
psi
:array
ofshape (n_samples, n_terms)
- Multivariate regressors.
Expand source code
def create_psi(self, basis_indices, univ_p_val): """ This function assemble the design matrix Psi from the given basis index set INDICES and the univariate polynomial evaluations univ_p_val. Parameters ---------- basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. univ_p_val : array of (n_samples, n_params, n_max+1) All univariate regressors up to `n_max`. Raises ------ ValueError n_terms in arguments do not match. Returns ------- psi : array of shape (n_samples, n_terms) Multivariate regressors. """ # Check if BasisIndices is a sparse matrix sparsity = sp.sparse.issparse(basis_indices) if sparsity: basis_indices = basis_indices.toarray() # Initialization and consistency checks # number of input variables n_params = univ_p_val.shape[1] # Size of the experimental design n_samples = univ_p_val.shape[0] # number of basis terms n_terms = basis_indices.shape[0] # check that the variables have consistent sizes if n_params != basis_indices.shape[1]: raise ValueError("The shapes of basis_indices and univ_p_val don't" " match!!") # Preallocate the Psi matrix for performance psi = np.ones((n_samples, n_terms)) # Assemble the Psi matrix for m in range(basis_indices.shape[1]): aa = np.where(basis_indices[:, m] > 0)[0] try: basisIdx = basis_indices[aa, m] bb = np.reshape(univ_p_val[:, m, basisIdx], psi[:, aa].shape) psi[:, aa] = np.multiply(psi[:, aa], bb) except ValueError as err: raise err return psi
def fit(self, X, y, basis_indices, reg_method=None)
-
Fit regression using the regression method provided.
Parameters
X
:array
ofshape (n_samples, n_features)
- Training vector, where n_samples is the number of samples and n_features is the number of features.
y
:array
ofshape (n_samples,)
- Target values.
basis_indices
:array
ofshape (n_terms, n_params)
- Multi-indices of multivariate polynomials.
reg_method
:str
, optional- DESCRIPTION. The default is None.
Returns
return_out_dict
:Dict
- Fitted estimator, spareMulti-Index, sparseX and coefficients.
Expand source code
def fit(self, X, y, basis_indices, reg_method=None): """ Fit regression using the regression method provided. Parameters ---------- X : array of shape (n_samples, n_features) Training vector, where n_samples is the number of samples and n_features is the number of features. y : array of shape (n_samples,) Target values. basis_indices : array of shape (n_terms, n_params) Multi-indices of multivariate polynomials. reg_method : str, optional DESCRIPTION. The default is None. Returns ------- return_out_dict : Dict Fitted estimator, spareMulti-Index, sparseX and coefficients. """ if reg_method is None: reg_method = self.pce_reg_method # Check if BasisIndices is a sparse matrix sparsity = sp.sparse.issparse(basis_indices) clf_poly = [] compute_score = True if self.verbose else False # inverse of the observed variance of the data if np.var(y) != 0: Lambda = 1 / np.var(y) else: Lambda = 1e-6 # Bayes sparse adaptive aPCE if reg_method.lower() != 'ols': if reg_method.lower() == 'brr' or np.var(y) == 0: clf_poly = lm.BayesianRidge(n_iter=1000, tol=1e-7, fit_intercept=True, normalize=True, compute_score=compute_score, alpha_1=1e-04, alpha_2=1e-04, lambda_1=Lambda, lambda_2=Lambda) clf_poly.converged = True elif reg_method.lower() == 'ard': clf_poly = lm.ARDRegression(fit_intercept=True, normalize=True, compute_score=compute_score, n_iter=1000, tol=0.0001, alpha_1=1e-3, alpha_2=1e-3, lambda_1=Lambda, lambda_2=Lambda) elif reg_method.lower() == 'fastard': clf_poly = RegressionFastARD(fit_intercept=True, normalize=True, compute_score=compute_score, n_iter=300, tol=1e-10) elif reg_method.lower() == 'bcs': clf_poly = RegressionFastLaplace(fit_intercept=False, n_iter=1000, tol=1e-7) elif reg_method.lower() == 'lars': clf_poly = lm.LassoLarsCV(fit_intercept=False) elif reg_method.lower() == 'sgdr': clf_poly = lm.SGDRegressor(fit_intercept=False, max_iter=5000, tol=1e-7) elif reg_method.lower() == 'omp': clf_poly = lm.OrthogonalMatchingPursuitCV(fit_intercept=False, max_iter=10) elif reg_method.lower() == 'vbl': clf_poly = VBLinearRegression(fit_intercept=False) elif reg_method.lower() == 'ebl': clf_poly = EBLinearRegression(optimizer='em') # Fit clf_poly.fit(X, y) # Select the nonzero entries of coefficients # The first column must be kept (For mean calculations) nnz_idx = np.nonzero(clf_poly.coef_)[0] if len(nnz_idx) == 0 or nnz_idx[0] != 0: nnz_idx = np.insert(np.nonzero(clf_poly.coef_)[0], 0, 0) # Remove the zero entries for Bases and PSI if need be if sparsity: sparse_basis_indices = basis_indices.toarray()[nnz_idx] else: sparse_basis_indices = basis_indices[nnz_idx] sparse_X = X[:, nnz_idx] # Store the coefficients of the regression model clf_poly.fit(sparse_X, y) coeffs = clf_poly.coef_ else: # This is for the case where all outputs are zero, thereby # all coefficients are zero if sparsity: sparse_basis_indices = basis_indices.toarray() else: sparse_basis_indices = basis_indices sparse_X = X coeffs = clf_poly.coef_ # Ordinary least square method (OLS) else: if sparsity: sparse_basis_indices = basis_indices.toarray() else: sparse_basis_indices = basis_indices sparse_X = X X_T_X = np.dot(sparse_X.T, sparse_X) if np.linalg.cond(X_T_X) > 1e-12 and \ np.linalg.cond(X_T_X) < 1 / sys.float_info.epsilon: # faster coeffs = sp.linalg.solve(X_T_X, np.dot(sparse_X.T, y)) else: # stabler coeffs = np.dot(np.dot(np.linalg.pinv(X_T_X), sparse_X.T), y) # Create a dict to pass the outputs return_out_dict = dict() return_out_dict['clf_poly'] = clf_poly return_out_dict['spareMulti-Index'] = sparse_basis_indices return_out_dict['sparePsi'] = sparse_X return_out_dict['coeffs'] = coeffs return return_out_dict
def adaptive_regression(self, ED_X, ED_Y, varIdx, verbose=False)
-
Adaptively fits the PCE model by comparing the scores of different degrees and q-norm.
Parameters
ED_X
:array
ofshape (n_samples, n_params)
- Experimental design.
ED_Y
:array
ofshape (n_samples,)
- Target values, i.e. simulation results for the Experimental design.
varIdx
:int
- Index of the output.
verbose
:bool
, optional- Print out summary. The default is False.
Returns
returnVars
:Dict
- Fitted estimator, best degree, best q-norm, LOOCVScore and coefficients.
Expand source code
def adaptive_regression(self, ED_X, ED_Y, varIdx, verbose=False): """ Adaptively fits the PCE model by comparing the scores of different degrees and q-norm. Parameters ---------- ED_X : array of shape (n_samples, n_params) Experimental design. ED_Y : array of shape (n_samples,) Target values, i.e. simulation results for the Experimental design. varIdx : int Index of the output. verbose : bool, optional Print out summary. The default is False. Returns ------- returnVars : Dict Fitted estimator, best degree, best q-norm, LOOCVScore and coefficients. """ NrSamples, n_params = ED_X.shape # Initialization qAllCoeffs, AllCoeffs = {}, {} qAllIndices_Sparse, AllIndices_Sparse = {}, {} qAllclf_poly, Allclf_poly = {}, {} qAllnTerms, AllnTerms = {}, {} qAllLCerror, AllLCerror = {}, {} # Extract degree array and qnorm array DegreeArray = np.array([*self.allBasisIndices], dtype=int) qnorm = [*self.allBasisIndices[str(int(DegreeArray[0]))]] # Some options for EarlyStop errorIncreases = False # Stop degree, if LOO error does not decrease n_checks_degree times n_checks_degree = 3 # Stop qNorm, if criterion isn't fulfilled n_checks_qNorm times n_checks_qNorm = 2 nqnorms = len(qnorm) qNormEarlyStop = True if nqnorms < n_checks_qNorm+1: qNormEarlyStop = False # ===================================================================== # basis adaptive polynomial chaos: repeat the calculation by increasing # polynomial degree until the highest accuracy is reached # ===================================================================== # For each degree check all q-norms and choose the best one scores = -np.inf * np.ones(DegreeArray.shape[0]) qNormScores = -np.inf * np.ones(nqnorms) for degIdx, deg in enumerate(DegreeArray): for qidx, q in enumerate(qnorm): # Extract the polynomial basis indices from the pool of # allBasisIndices BasisIndices = self.allBasisIndices[str(deg)][str(q)] # Assemble the Psi matrix Psi = self.create_psi(BasisIndices, self.univ_p_val) # Calulate the cofficients of the meta model outs = self.fit(Psi, ED_Y, BasisIndices) # Calculate and save the score of LOOCV score, LCerror = self.corr_loocv_error(outs['clf_poly'], outs['sparePsi'], outs['coeffs'], ED_Y) # Check the convergence of noise for FastARD if self.pce_reg_method == 'FastARD' and \ outs['clf_poly'].alpha_ < np.finfo(np.float32).eps: score = -np.inf qNormScores[qidx] = score qAllCoeffs[str(qidx+1)] = outs['coeffs'] qAllIndices_Sparse[str(qidx+1)] = outs['spareMulti-Index'] qAllclf_poly[str(qidx+1)] = outs['clf_poly'] qAllnTerms[str(qidx+1)] = BasisIndices.shape[0] qAllLCerror[str(qidx+1)] = LCerror # EarlyStop check # if there are at least n_checks_qNorm entries after the # best one, we stop if qNormEarlyStop and \ sum(np.isfinite(qNormScores)) > n_checks_qNorm: # If the error has increased the last two iterations, stop! qNormScores_nonInf = qNormScores[np.isfinite(qNormScores)] deltas = np.sign(np.diff(qNormScores_nonInf)) if sum(deltas[-n_checks_qNorm+1:]) == 2: # stop the q-norm loop here break if np.var(ED_Y) == 0: break # Store the score in the scores list best_q = np.nanargmax(qNormScores) scores[degIdx] = qNormScores[best_q] AllCoeffs[str(degIdx+1)] = qAllCoeffs[str(best_q+1)] AllIndices_Sparse[str(degIdx+1)] = qAllIndices_Sparse[str(best_q+1)] Allclf_poly[str(degIdx+1)] = qAllclf_poly[str(best_q+1)] AllnTerms[str(degIdx+1)] = qAllnTerms[str(best_q+1)] AllLCerror[str(degIdx+1)] = qAllLCerror[str(best_q+1)] # Check the direction of the error (on average): # if it increases consistently stop the iterations if len(scores[scores != -np.inf]) > n_checks_degree: scores_nonInf = scores[scores != -np.inf] ss = np.sign(scores_nonInf - np.max(scores_nonInf)) # ss<0 error decreasing errorIncreases = np.sum(np.sum(ss[-2:])) <= -1*n_checks_degree if errorIncreases: break # Check only one degree, if target matrix has zero variance if np.var(ED_Y) == 0: break # ------------------ Summary of results ------------------ # Select the one with the best score and save the necessary outputs best_deg = np.nanargmax(scores)+1 coeffs = AllCoeffs[str(best_deg)] basis_indices = AllIndices_Sparse[str(best_deg)] clf_poly = Allclf_poly[str(best_deg)] LOOCVScore = np.nanmax(scores) P = AllnTerms[str(best_deg)] LCerror = AllLCerror[str(best_deg)] degree = DegreeArray[np.nanargmax(scores)] qnorm = float(qnorm[best_q]) # ------------------ Print out Summary of results ------------------ if self.verbose: # Create PSI_Sparse by removing redundent terms nnz_idx = np.nonzero(coeffs)[0] BasisIndices_Sparse = basis_indices[nnz_idx] print(f'Output variable {varIdx+1}:') print('The estimation of PCE coefficients converged at polynomial ' f'degree {DegreeArray[best_deg-1]} with ' f'{len(BasisIndices_Sparse)} terms (Sparsity index = ' f'{round(len(BasisIndices_Sparse)/P, 3)}).') print(f'Final ModLOO error estimate: {1-max(scores):.3e}') print('\n'+'-'*50) if verbose: print('='*50) print(' '*10 + ' Summary of results ') print('='*50) print("scores:\n", scores) print("Best score's degree:", self.DegreeArray[best_deg-1]) print("NO. of terms:", len(basis_indices)) print("Sparsity index:", round(len(basis_indices)/P, 3)) print("Best Indices:\n", basis_indices) if self.pce_reg_method in ['BRR', 'ARD']: fig, ax = plt.subplots(figsize=(12, 10)) plt.title("Marginal log-likelihood") plt.plot(clf_poly.scores_, color='navy', linewidth=2) plt.ylabel("Score") plt.xlabel("Iterations") if self.pce_reg_method.lower() == 'bbr': text = f"$\\alpha={clf_poly.alpha_:.1f}$\n" f"$\\lambda={clf_poly.lambda_:.3f}$\n" f"$L={clf_poly.scores_[-1]:.1f}$" else: text = f"$\\alpha={clf_poly.alpha_:.1f}$\n$" f"\\L={clf_poly.scores_[-1]:.1f}$" plt.text(0.75, 0.5, text, fontsize=18, transform=ax.transAxes) plt.show() print('='*80) # Create a dict to pass the outputs returnVars = dict() returnVars['clf_poly'] = clf_poly returnVars['degree'] = degree returnVars['qnorm'] = qnorm returnVars['coeffs'] = coeffs returnVars['multi_indices'] = basis_indices returnVars['LOOCVScore'] = LOOCVScore returnVars['LCerror'] = LCerror return returnVars
def corr_loocv_error(self, clf, psi, coeffs, y)
-
Calculates the corrected LOO error for regression on regressor matrix
psi
that generated the coefficients based on [1] and [2].[1] Blatman, G., 2009. Adaptive sparse polynomial chaos expansions for uncertainty propagation and sensitivity analysis (Doctoral dissertation, Clermont-Ferrand 2).
[2] Blatman, G. and Sudret, B., 2011. Adaptive sparse polynomial chaos expansion based on least angle regression. Journal of computational Physics, 230(6), pp.2345-2367.
Parameters
clf
:object
- Fitted estimator.
psi
:array
ofshape (n_samples, n_features)
- The multivariate orthogonal polynomials (regressor).
coeffs
:array-like
ofshape (n_features,)
- Estimated cofficients.
y
:array
ofshape (n_samples,)
- Target values.
Returns
Q_2
:float
- LOOCV Validation score (1-LOOCV erro).
residual
:array
ofshape (n_samples,)
- Residual values (y - predicted targets).
Expand source code
def corr_loocv_error(self, clf, psi, coeffs, y): """ Calculates the corrected LOO error for regression on regressor matrix `psi` that generated the coefficients based on [1] and [2]. [1] Blatman, G., 2009. Adaptive sparse polynomial chaos expansions for uncertainty propagation and sensitivity analysis (Doctoral dissertation, Clermont-Ferrand 2). [2] Blatman, G. and Sudret, B., 2011. Adaptive sparse polynomial chaos expansion based on least angle regression. Journal of computational Physics, 230(6), pp.2345-2367. Parameters ---------- clf : object Fitted estimator. psi : array of shape (n_samples, n_features) The multivariate orthogonal polynomials (regressor). coeffs : array-like of shape (n_features,) Estimated cofficients. y : array of shape (n_samples,) Target values. Returns ------- Q_2 : float LOOCV Validation score (1-LOOCV erro). residual : array of shape (n_samples,) Residual values (y - predicted targets). """ psi = np.array(psi, dtype=float) # Create PSI_Sparse by removing redundent terms nnz_idx = np.nonzero(coeffs)[0] if len(nnz_idx) == 0: nnz_idx = [0] psi_sparse = psi[:, nnz_idx] # NrCoeffs of aPCEs P = len(nnz_idx) # NrEvaluation (Size of experimental design) N = psi.shape[0] # Build the projection matrix PsiTPsi = np.dot(psi_sparse.T, psi_sparse) if np.linalg.cond(PsiTPsi) > 1e-12 and \ np.linalg.cond(PsiTPsi) < 1/sys.float_info.epsilon: # faster M = sp.linalg.solve(PsiTPsi, sp.sparse.eye(PsiTPsi.shape[0]).toarray()) else: # stabler M = np.linalg.pinv(PsiTPsi) # h factor (the full matrix is not calculated explicitly, # only the trace is, to save memory) PsiM = np.dot(psi_sparse, M) h = np.sum(np.multiply(PsiM, psi_sparse), axis=1, dtype=np.float128) # ------ Calculate Error Loocv for each measurement point ---- # Residuals if isinstance(clf, list): residual = np.dot(psi, coeffs) - y else: residual = clf.predict(psi) - y # Variance varY = np.var(y) if varY == 0: normEmpErr = 0 ErrLoo = 0 LCerror = np.zeros((y.shape)) else: normEmpErr = np.mean(residual**2)/varY # LCerror = np.divide(residual, (1-h)) LCerror = residual / (1-h)[:, np.newaxis] ErrLoo = np.mean(np.square(LCerror)) / varY # if there are NaNs, just return an infinite LOO error (this # happens, e.g., when a strongly underdetermined problem is solved) if np.isnan(ErrLoo): ErrLoo = np.inf # Corrected Error for over-determined system trM = np.trace(M) if trM < 0 or abs(trM) > 1e6: trM = np.trace(np.linalg.pinv(np.dot(psi.T, psi))) # Over-determined system of Equation if N > P: T_factor = N/(N-P) * (1 + trM) # Under-determined system of Equation else: T_factor = np.inf CorrectedErrLoo = ErrLoo * T_factor Q_2 = 1 - CorrectedErrLoo return Q_2, residual
def pca_transformation(self, Output)
-
Transforms the targets (outputs) via Principal Component Analysis
Parameters
Output
:array
ofshape (n_samples,)
- Target values.
Returns
pca
:obj
- Fitted sklearnPCA object.
OutputMatrix
:array
ofshape (n_samples,)
- Transformed target values.
Expand source code
def pca_transformation(self, Output): """ Transforms the targets (outputs) via Principal Component Analysis Parameters ---------- Output : array of shape (n_samples,) Target values. Returns ------- pca : obj Fitted sklearnPCA object. OutputMatrix : array of shape (n_samples,) Transformed target values. """ # Transform via Principal Component Analysis if hasattr(self, 'var_pca_threshold'): var_pca_threshold = self.var_pca_threshold else: var_pca_threshold = 100.0 n_samples, n_features = Output.shape if hasattr(self, 'n_pca_components'): n_pca_components = self.n_pca_components else: # Instantiate and fit sklearnPCA object covar_matrix = sklearnPCA(n_components=None) covar_matrix.fit(Output) var = np.cumsum(np.round(covar_matrix.explained_variance_ratio_, decimals=5)*100) # Find the number of components to explain self.varPCAThreshold of # variance try: n_components = np.where(var >= var_pca_threshold)[0][0] + 1 except IndexError: n_components = min(n_samples, n_features) n_pca_components = min(n_samples, n_features, n_components) # Print out a report print() print('-' * 50) print(f"PCA transformation is performed with {n_pca_components}" " components.") print('-' * 50) print() # Fit and transform with the selected number of components pca = sklearnPCA(n_components=n_pca_components, svd_solver='randomized') OutputMatrix = pca.fit_transform(Output) return pca, OutputMatrix
def gaussian_process_emulator(self, X, y, nug_term=None, autoSelect=False, varIdx=None)
-
Fits a Gaussian Process Emulator to the target given the training points.
Parameters
X
:array
ofshape (n_samples, n_params)
- Training points.
y
:array
ofshape (n_samples,)
- Target values.
nug_term
:float
, optional- Nugget term. The default is None, i.e. variance of y.
autoSelect
:bool
, optional- Loop over some kernels and select the best. The default is False.
varIdx
:int
, optional- The index number. The default is None.
Returns
gp
:object
- Fitted estimator.
Expand source code
def gaussian_process_emulator(self, X, y, nug_term=None, autoSelect=False, varIdx=None): """ Fits a Gaussian Process Emulator to the target given the training points. Parameters ---------- X : array of shape (n_samples, n_params) Training points. y : array of shape (n_samples,) Target values. nug_term : float, optional Nugget term. The default is None, i.e. variance of y. autoSelect : bool, optional Loop over some kernels and select the best. The default is False. varIdx : int, optional The index number. The default is None. Returns ------- gp : object Fitted estimator. """ nug_term = nug_term if nug_term else np.var(y) Kernels = [nug_term * kernels.RBF(length_scale=1.0, length_scale_bounds=(1e-25, 1e15)), nug_term * kernels.RationalQuadratic(length_scale=0.2, alpha=1.0), nug_term * kernels.Matern(length_scale=1.0, length_scale_bounds=(1e-15, 1e5), nu=1.5)] # Automatic selection of the kernel if autoSelect: gp = {} BME = [] for i, kernel in enumerate(Kernels): gp[i] = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=3, normalize_y=False) # Fit to data using Maximum Likelihood Estimation gp[i].fit(X, y) # Store the MLE as BME score BME.append(gp[i].log_marginal_likelihood()) gp = gp[np.argmax(BME)] else: gp = GaussianProcessRegressor(kernel=Kernels[0], n_restarts_optimizer=3, normalize_y=False) gp.fit(X, y) # Compute score if varIdx is not None: Score = gp.score(X, y) print('-'*50) print(f'Output variable {varIdx}:') print('The estimation of GPE coefficients converged,') print(f'with the R^2 score: {Score:.3f}') print('-'*50) return gp
def eval_metamodel(self, samples=None, nsamples=None, sampling_method='random', return_samples=False)
-
Evaluates meta-model at the requested samples. One can also generate nsamples.
Parameters
samples
:array
ofshape (n_samples, n_params)
, optional- Samples to evaluate meta-model at. The default is None.
nsamples
:int
, optional- Number of samples to generate, if no
samples
is provided. The default is None. sampling_method
:str
, optional- Type of sampling, if no
samples
is provided. The default is 'random'. return_samples
:bool
, optional- Retun samples, if no
samples
is provided. The default is False.
Returns
mean_pred
:dict
- Mean of the predictions.
std_pred
:dict
- Standard deviatioon of the predictions.
Expand source code
def eval_metamodel(self, samples=None, nsamples=None, sampling_method='random', return_samples=False): """ Evaluates meta-model at the requested samples. One can also generate nsamples. Parameters ---------- samples : array of shape (n_samples, n_params), optional Samples to evaluate meta-model at. The default is None. nsamples : int, optional Number of samples to generate, if no `samples` is provided. The default is None. sampling_method : str, optional Type of sampling, if no `samples` is provided. The default is 'random'. return_samples : bool, optional Retun samples, if no `samples` is provided. The default is False. Returns ------- mean_pred : dict Mean of the predictions. std_pred : dict Standard deviatioon of the predictions. """ if self.meta_model_type.lower() == 'gpe': model_dict = self.gp_poly else: model_dict = self.coeffs_dict if samples is None: if nsamples is None: self.n_samples = 100000 else: self.n_samples = nsamples samples = self.ExpDesign.generate_samples(self.n_samples, sampling_method) else: self.samples = samples self.n_samples = len(samples) # Transform samples samples = self.ExpDesign.transform(samples) if self.meta_model_type.lower() != 'gpe': univ_p_val = self.univ_basis_vals(samples, n_max=np.max(self.pce_deg)) mean_pred = {} std_pred = {} # Loop over outputs for ouput, values in model_dict.items(): mean = np.zeros((len(samples), len(values))) std = np.zeros((len(samples), len(values))) idx = 0 for in_key, InIdxValues in values.items(): # Perdiction with GPE if self.meta_model_type.lower() == 'gpe': X_T = self.x_scaler[ouput].transform(samples) gp = self.gp_poly[ouput][in_key] y_mean, y_std = gp.predict(X_T, return_std=True) else: # Perdiction with PCE or pcekriging # Assemble Psi matrix psi = self.create_psi(self.basis_dict[ouput][in_key], univ_p_val) # Perdiction try: # with error bar clf_poly = self.clf_poly[ouput][in_key] y_mean, y_std = clf_poly.predict(psi, return_std=True) except: # without error bar coeffs = self.coeffs_dict[ouput][in_key] y_mean = np.dot(psi, coeffs) y_std = np.zeros_like(y_mean) mean[:, idx] = y_mean std[:, idx] = y_std idx += 1 if self.dim_red_method.lower() == 'pca': PCA = self.pca[ouput] mean_pred[ouput] = PCA.mean_ + np.dot(mean, PCA.components_) std_pred[ouput] = np.sqrt(np.dot(std**2, PCA.components_**2)) else: mean_pred[ouput] = mean std_pred[ouput] = std if return_samples: return mean_pred, std_pred, samples else: return mean_pred, std_pred
def create_model_error(self, X, y, name='Calib')
-
Fits a GPE-based model error.
Parameters
X
:array
ofshape (n_outputs, n_inputs)
- Input array. It can contain any forcing inputs or coordinates of extracted data.
y
:array
ofshape (n_outputs,)
- The model response for the MAP parameter set.
name
:str
, optional- Calibration or validation. The default is
'Calib'
.
Returns
self
:object
- Self object.
Expand source code
def create_model_error(self, X, y, name='Calib'): """ Fits a GPE-based model error. Parameters ---------- X : array of shape (n_outputs, n_inputs) Input array. It can contain any forcing inputs or coordinates of extracted data. y : array of shape (n_outputs,) The model response for the MAP parameter set. name : str, optional Calibration or validation. The default is `'Calib'`. Returns ------- self: object Self object. """ Model = self.ModelObj outputNames = Model.Output.Names self.errorRegMethod = 'GPE' self.errorclf_poly = self.auto_vivification() self.errorScale = self.auto_vivification() # Read data MeasuredData = Model.read_observation(case=name) # Fitting GPR based bias model for out in outputNames: nan_idx = ~np.isnan(MeasuredData[out]) # Select data try: data = MeasuredData[out].values[nan_idx] except AttributeError: data = MeasuredData[out][nan_idx] # Prepare the input matrix scaler = MinMaxScaler() delta = data # - y[out][0] BiasInputs = np.hstack((X[out], y[out].reshape(-1, 1))) X_S = scaler.fit_transform(BiasInputs) gp = self.gaussian_process_emulator(X_S, delta) self.errorScale[out]["y_1"] = scaler self.errorclf_poly[out]["y_1"] = gp return self
def eval_model_error(self, X, y_pred)
-
Evaluates the error model.
Parameters
X
:array
- Inputs.
y_pred
:dict
- Predictions.
Returns
mean_pred
:dict
- Mean predition of the GPE-based error model.
std_pred
:dict
- standard deviation of the GPE-based error model.
Expand source code
def eval_model_error(self, X, y_pred): """ Evaluates the error model. Parameters ---------- X : array Inputs. y_pred : dict Predictions. Returns ------- mean_pred : dict Mean predition of the GPE-based error model. std_pred : dict standard deviation of the GPE-based error model. """ mean_pred = {} std_pred = {} for Outkey, ValuesDict in self.errorclf_poly.items(): pred_mean = np.zeros_like(y_pred[Outkey]) pred_std = np.zeros_like(y_pred[Outkey]) for Inkey, InIdxValues in ValuesDict.items(): gp = self.errorclf_poly[Outkey][Inkey] scaler = self.errorScale[Outkey][Inkey] # Transform Samples using scaler for j, pred in enumerate(y_pred[Outkey]): BiasInputs = np.hstack((X[Outkey], pred.reshape(-1, 1))) Samples_S = scaler.transform(BiasInputs) y_hat, y_std = gp.predict(Samples_S, return_std=True) pred_mean[j] = y_hat pred_std[j] = y_std # pred_mean[j] += pred mean_pred[Outkey] = pred_mean std_pred[Outkey] = pred_std return mean_pred, std_pred