diff --git a/examples/analytical-function/example_analytical_function_testSequential.py b/examples/analytical-function/example_analytical_function_testSequential.py index 9abb57ef60da9cae387c80612f850d2220b5b2f8..06dd3e25dfabaa54f167ed6d025894f328b2eb5e 100644 --- a/examples/analytical-function/example_analytical_function_testSequential.py +++ b/examples/analytical-function/example_analytical_function_testSequential.py @@ -234,18 +234,28 @@ if __name__ == "__main__": This part of the code tests the available combinations of the sequential strategies. The following combinations have remaining issues to be solved: - - exploitation = 'BayesOptDesign' has circular dependency with the engine class - exploration = 'Voronoi' creates mismatching numbers of candidates and weights - exploration = 'loocv' needs MetaModel.create_ModelError, which does not exist - exploration = 'dual annealing' restricted in what it allows as exploitation methods - tradeoff = 'adaptive' perhaps not possible in the first AL iteration? The following combinations are running through: - - BayesActDesign quite fast - - VarOptDesign as well + Tradeoff + - None + - equal + - epsilon-decreasing + Exploration + - random + - global_mc + Exploitation + - BayesActDesign + - VarOptDesign + - alphabetic + - space-filling Performance notes: - BayesOptDesign slow in the OptBayesianDesign iterations + - exploitation = 'BayesOptDesign' has circular dependency with the engine class # TODO: user-defined options were left out in this test for both sampling-methods and exploitation schemes. diff --git a/src/bayesvalidrox/surrogate_models/engine.py b/src/bayesvalidrox/surrogate_models/engine.py index 08b3a05d1fffe44123d658fe00552f0135a8d6c2..d972dba5262f03f581207a8aed6d7ed95484b3d5 100644 --- a/src/bayesvalidrox/surrogate_models/engine.py +++ b/src/bayesvalidrox/surrogate_models/engine.py @@ -663,1177 +663,6 @@ class Engine: # return self.MetaModel - # ------------------------------------------------------------------------- - def util_VarBasedDesign(self, X_can, index, util_func='Entropy'): - """ - Computes the exploitation scores based on: - active learning MacKay(ALM) and active learning Cohn (ALC) - Paper: Sequential Design with Mutual Information for Computer - Experiments (MICE): Emulation of a Tsunami Model by Beck and Guillas - (2016) - - Parameters - ---------- - X_can : array of shape (n_samples, n_params) - Candidate samples. - index : int - Model output index. - util_func : string, optional - Exploitation utility function. The default is 'Entropy'. - - Returns - ------- - float - Score. - - """ - MetaModel = self.MetaModel - ED_X = self.ExpDesign.X - out_dict_y = self.ExpDesign.Y - out_names = self.out_names - - # Run the Metamodel for the candidate - X_can = X_can.reshape(1, -1) - Y_PC_can, std_PC_can = MetaModel.eval_metamodel(samples=X_can) - - score = None - if util_func.lower() == 'alm': - # ----- Entropy/MMSE/active learning MacKay(ALM) ----- - # Compute perdiction variance of the old model - canPredVar = {key: std_PC_can[key] ** 2 for key in out_names} - - varPCE = np.zeros((len(out_names), X_can.shape[0])) - for KeyIdx, key in enumerate(out_names): - varPCE[KeyIdx] = np.max(canPredVar[key], axis=1) - score = np.max(varPCE, axis=0) - - elif util_func.lower() == 'eigf': - # ----- Expected Improvement for Global fit ----- - # Find closest EDX to the candidate - distances = distance.cdist(ED_X, X_can, 'euclidean') - index = np.argmin(distances) - - # Compute perdiction error and variance of the old model - predError = {key: Y_PC_can[key] for key in out_names} - canPredVar = {key: std_PC_can[key] ** 2 for key in out_names} - - # Compute perdiction error and variance of the old model - # Eq (5) from Liu et al.(2018) - EIGF_PCE = np.zeros((len(out_names), X_can.shape[0])) - for KeyIdx, key in enumerate(out_names): - residual = predError[key] - out_dict_y[key][int(index)] - var = canPredVar[key] - EIGF_PCE[KeyIdx] = np.max(residual ** 2 + var, axis=1) - score = np.max(EIGF_PCE, axis=0) - - return -1 * score # -1 is for minimization instead of maximization - - # ------------------------------------------------------------------------- - def util_BayesianActiveDesign(self, y_hat, std, sigma2Dict, var='DKL'): - """ - Computes scores based on Bayesian active design criterion (var). - - It is based on the following paper: - Oladyshkin, Sergey, Farid Mohammadi, Ilja Kroeker, and Wolfgang Nowak. - "Bayesian3 active learning for the gaussian process emulator using - information theory." Entropy 22, no. 8 (2020): 890. - - Parameters - ---------- - y_hat : unknown - std : unknown - sigma2Dict : dict - A dictionary containing the measurement errors (sigma^2). - var : string, optional - BAL design criterion. The default is 'DKL'. - - Returns - ------- - float - Score. - - """ - - # Get the data - obs_data = self.observations - # TODO: this should be optimizable to be calculated explicitly - if hasattr(self.Model, 'n_obs'): - n_obs = self.Model.n_obs - else: - n_obs = self.n_obs - mc_size = 10000 - - # Sample a distribution for a normal dist - # with Y_mean_can as the mean and Y_std_can as std. - Y_MC, std_MC = {}, {} - logPriorLikelihoods = np.zeros(mc_size) - for key in list(y_hat): - cov = np.diag(std[key] ** 2) - print(key, y_hat[key], std[key]) - # TODO: added the allow_singular = True here - rv = stats.multivariate_normal(mean=y_hat[key], cov=cov, allow_singular=True) - Y_MC[key] = rv.rvs(size=mc_size) - logPriorLikelihoods += rv.logpdf(Y_MC[key]) - std_MC[key] = np.zeros((mc_size, y_hat[key].shape[0])) - - # Likelihood computation (Comparison of data and simulation - # results via PCE with candidate design) - likelihoods = self._normpdf(Y_MC, std_MC, obs_data, sigma2Dict) - - # Rejection Step - # Random numbers between 0 and 1 - unif = np.random.rand(1, mc_size)[0] - - # Reject the poorly performed prior - accepted = (likelihoods / np.max(likelihoods)) >= unif - - # Prior-based estimation of BME - logBME = np.log(np.nanmean(likelihoods), dtype=np.longdouble) # float128) - - # Posterior-based expectation of likelihoods - postLikelihoods = likelihoods[accepted] - postExpLikelihoods = np.mean(np.log(postLikelihoods)) - - # Posterior-based expectation of prior densities - postExpPrior = np.mean(logPriorLikelihoods[accepted]) - - # Utility function Eq.2 in Ref. (2) - # Posterior covariance matrix after observing data y - # Kullback-Leibler Divergence (Sergey's paper) - U_J_d = None - if var == 'DKL': - - # TODO: Calculate the correction factor for BME - # BMECorrFactor = self.BME_Corr_Weight(PCE_SparseBayes_can, - # ObservationData, sigma2Dict) - # BME += BMECorrFactor - # Haun et al implementation - # U_J_d = np.mean(np.log(Likelihoods[Likelihoods!=0])- logBME) - U_J_d = postExpLikelihoods - logBME - - # Marginal log likelihood - elif var == 'BME': - U_J_d = np.nanmean(likelihoods) - - # Entropy-based information gain - elif var == 'infEntropy': - logBME = np.log(np.nanmean(likelihoods)) - infEntropy = logBME - postExpPrior - postExpLikelihoods - U_J_d = infEntropy * -1 # -1 for minimization - - # Bayesian information criterion - elif var == 'BIC': - coeffs = self.MetaModel.coeffs_dict.values() - nModelParams = max(len(v) for val in coeffs for v in val.values()) - maxL = np.nanmax(likelihoods) - U_J_d = -2 * np.log(maxL) + np.log(n_obs) * nModelParams - - # Akaike information criterion - elif var == 'AIC': - coeffs = self.MetaModel.coeffs_dict.values() - nModelParams = max(len(v) for val in coeffs for v in val.values()) - maxlogL = np.log(np.nanmax(likelihoods)) - AIC = -2 * maxlogL + 2 * nModelParams - # 2 * nModelParams * (nModelParams+1) / (n_obs-nModelParams-1) - penTerm = 0 - U_J_d = 1 * (AIC + penTerm) - - # Deviance information criterion - elif var == 'DIC': - # D_theta_bar = np.mean(-2 * Likelihoods) - N_star_p = 0.5 * np.var(np.log(likelihoods[likelihoods != 0])) - Likelihoods_theta_mean = self._normpdf( - y_hat, std, obs_data, sigma2Dict - ) - DIC = -2 * np.log(Likelihoods_theta_mean) + 2 * N_star_p - - U_J_d = DIC - - else: - print('The algorithm you requested has not been implemented yet!') - - # Handle inf and NaN (replace by zero) - if np.isnan(U_J_d) or U_J_d == -np.inf or U_J_d == np.inf: - U_J_d = 0.0 - - # Clear memory - del likelihoods - del Y_MC - del std_MC - - return -1 * U_J_d # -1 is for minimization instead of maximization - - # ------------------------------------------------------------------------- - def util_BayesianDesign(self, X_can, X_MC, sigma2Dict, var='DKL'): - """ - Computes scores based on Bayesian sequential design criterion (var). - - Parameters - ---------- - X_can : array of shape (n_samples, n_params) - Candidate samples. - X_MC : unknown - sigma2Dict : dict - A dictionary containing the measurement errors (sigma^2). - var : string, optional - Bayesian design criterion. The default is 'DKL'. - - Returns - ------- - float - Score. - - """ - - # To avoid changes ub original aPCE object - MetaModel = self.MetaModel - out_names = self.out_names - if X_can.ndim == 1: - X_can = X_can.reshape(1, -1) - - # Compute the mean and std based on the MetaModel - # pce_means, pce_stds = self._compute_pce_moments(MetaModel) - if var == 'ALC': - Y_MC, Y_MC_std = MetaModel.eval_metamodel(samples=X_MC) - - # Old Experimental design - oldExpDesignX = self.ExpDesign.X - oldExpDesignY = self.ExpDesign.Y - - # Evaluate the PCE metamodels at that location ??? - Y_PC_can, Y_std_can = MetaModel.eval_metamodel(samples=X_can) - PCE_Model_can = deepcopy(MetaModel) - engine_can = deepcopy(self) - # Add the candidate to the ExpDesign - NewExpDesignX = np.vstack((oldExpDesignX, X_can)) - - NewExpDesignY = {} - for key in oldExpDesignY.keys(): - NewExpDesignY[key] = np.vstack( - (oldExpDesignY[key], Y_PC_can[key]) - ) - - engine_can.ExpDesign.sampling_method = 'user' - engine_can.ExpDesign.X = NewExpDesignX - # engine_can.ModelOutputDict = NewExpDesignY - engine_can.ExpDesign.Y = NewExpDesignY - - # Train the model for the observed data using x_can - engine_can.MetaModel.input_obj.poly_coeffs_flag = False - engine_can.start_engine() - engine_can.train_normal(parallel=False) - engine_can.MetaModel.fit(NewExpDesignX, NewExpDesignY) - # engine_can.train_norm_design(parallel=False) - - # Set the ExpDesign to its original values - engine_can.ExpDesign.X = oldExpDesignX - engine_can.ModelOutputDict = oldExpDesignY - engine_can.ExpDesign.Y = oldExpDesignY - - if var.lower() == 'mi': - # Mutual information based on Krause et al. - # Adapted from Beck & Guillas (MICE) paper - _, std_PC_can = engine_can.MetaModel.eval_metamodel(samples=X_can) - std_can = {key: std_PC_can[key] for key in out_names} - - std_old = {key: Y_std_can[key] for key in out_names} - - varPCE = np.zeros((len(out_names))) - for i, key in enumerate(out_names): - varPCE[i] = np.mean(std_old[key] ** 2 / std_can[key] ** 2) - score = np.mean(varPCE) - - return -1 * score - - elif var.lower() == 'alc': - # Active learning based on Gramyc and Lee - # Adaptive design and analysis of supercomputer experiments Techno- - # metrics, 51 (2009), pp. 130–145. - - # Evaluate the MetaModel at the given samples - Y_MC_can, Y_MC_std_can = engine_can.MetaModel.eval_metamodel(samples=X_MC) - - # Compute the score - score = [] - for i, key in enumerate(out_names): - pce_var = Y_MC_std_can[key] ** 2 - pce_var_can = Y_MC_std[key] ** 2 - score.append(np.mean(pce_var - pce_var_can, axis=0)) - score = np.mean(score) - - return -1 * score - - # ---------- Inner MC simulation for computing Utility Value ---------- - # Estimation of the integral via Monte Varlo integration - MCsize = X_MC.shape[0] - ESS = 0 - - likelihoods = None - while (ESS > MCsize) or (ESS < 1): - - # Enriching Monte Carlo samples if need be - if ESS != 0: - X_MC = self.ExpDesign.generate_samples( - MCsize, 'random' - ) - - # Evaluate the MetaModel at the given samples - Y_MC, std_MC = PCE_Model_can.eval_metamodel(samples=X_MC) - - # Likelihood computation (Comparison of data and simulation - # results via PCE with candidate design) - likelihoods = self._normpdf( - Y_MC, std_MC, self.observations, sigma2Dict - ) - - # Check the Effective Sample Size (1<ESS<MCsize) - ESS = 1 / np.sum(np.square(likelihoods / np.sum(likelihoods))) - - # Enlarge sample size if it doesn't fulfill the criteria - if (ESS > MCsize) or (ESS < 1): - print("--- increasing MC size---") - MCsize *= 10 - ESS = 0 - - # Rejection Step - # Random numbers between 0 and 1 - unif = np.random.rand(1, MCsize)[0] - - # Reject the poorly performed prior - accepted = (likelihoods / np.max(likelihoods)) >= unif - - # -------------------- Utility functions -------------------- - # Utility function Eq.2 in Ref. (2) - # Kullback-Leibler Divergence (Sergey's paper) - U_J_d = None - if var == 'DKL': - - # Prior-based estimation of BME - logBME = np.log(np.nanmean(likelihoods, dtype=np.longdouble)) # float128)) - - # Posterior-based expectation of likelihoods - # postLikelihoods = likelihoods[accepted] - # postExpLikelihoods = np.mean(np.log(postLikelihoods)) - - # Haun et al implementation - U_J_d = np.mean(np.log(likelihoods[likelihoods != 0]) - logBME) - - # U_J_d = np.sum(G_n_m_all) - # Ryan et al (2014) implementation - # importanceWeights = Likelihoods[Likelihoods!=0]/np.sum(Likelihoods[Likelihoods!=0]) - # U_J_d = np.mean(importanceWeights*np.log(Likelihoods[Likelihoods!=0])) - logBME - - # U_J_d = postExpLikelihoods - logBME - - # Marginal likelihood - elif var == 'BME': - - # Prior-based estimation of BME - logBME = np.log(np.nanmean(likelihoods)) - U_J_d = logBME - - # Bayes risk likelihood - elif var == 'BayesRisk': - - U_J_d = -1 * np.var(likelihoods) - - # Entropy-based information gain - elif var == 'infEntropy': - # Prior-based estimation of BME - logBME = np.log(np.nanmean(likelihoods)) - - # Posterior-based expectation of likelihoods - postLikelihoods = likelihoods[accepted] - postLikelihoods /= np.nansum(likelihoods[accepted]) - postExpLikelihoods = np.mean(np.log(postLikelihoods)) - - # Posterior-based expectation of prior densities - logPriorLikelihoods = [] - logPriorLikelihoods[accepted] = None # TODO: this is not defined here, just a fix - postExpPrior = np.mean(logPriorLikelihoods[accepted]) - - infEntropy = logBME - postExpPrior - postExpLikelihoods - - U_J_d = infEntropy * -1 # -1 for minimization - - # D-Posterior-precision - elif var == 'DPP': - X_Posterior = X_MC[accepted] - # covariance of the posterior parameters - U_J_d = -np.log(np.linalg.det(np.cov(X_Posterior))) - - # A-Posterior-precision - elif var == 'APP': - X_Posterior = X_MC[accepted] - # trace of the posterior parameters - U_J_d = -np.log(np.trace(np.cov(X_Posterior))) - - else: - print('The algorithm you requested has not been implemented yet!') - - # Clear memory - del likelihoods - del Y_MC - del std_MC - - return -1 * U_J_d # -1 is for minimization instead of maximization - - # ------------------------------------------------------------------------- - def run_util_func(self, method, candidates, index, sigma2Dict=None, - var=None, X_MC=None): - """ - Runs the utility function based on the given method. - - Parameters - ---------- - method : string - Exploitation method: `VarOptDesign`, `BayesActDesign` and - `BayesOptDesign`. - candidates : array of shape (n_samples, n_params) - All candidate parameter sets. - index : int - ExpDesign index. - sigma2Dict : dict, optional - A dictionary containing the measurement errors (sigma^2). The - default is None. - var : string, optional - Utility function. The default is None. - X_MC : TYPE, optional - DESCRIPTION. The default is None. - - Returns - ------- - index : TYPE - DESCRIPTION. - List - Scores. - - """ - - if method.lower() == 'varoptdesign': - # U_J_d = self.util_VarBasedDesign(candidates, index, var) - U_J_d = np.zeros((candidates.shape[0])) - for idx, X_can in tqdm(enumerate(candidates), ascii=True, - desc="varoptdesign"): - U_J_d[idx] = self.util_VarBasedDesign(X_can, index, var) - - elif method.lower() == 'bayesactdesign': - NCandidate = candidates.shape[0] - U_J_d = np.zeros(NCandidate) - # Evaluate all candidates - y_can, std_can = self.MetaModel.eval_metamodel(samples=candidates) - # loop through candidates - for idx, X_can in tqdm(enumerate(candidates), ascii=True, - desc="BAL Design"): - y_hat = {key: items[idx] for key, items in y_can.items()} - std = {key: items[idx] for key, items in std_can.items()} - - # print(y_hat) - # print(std) - U_J_d[idx] = self.util_BayesianActiveDesign( - y_hat, std, sigma2Dict, var) - - elif method.lower() == 'bayesoptdesign': - NCandidate = candidates.shape[0] - U_J_d = np.zeros(NCandidate) - for idx, X_can in tqdm(enumerate(candidates), ascii=True, - desc="OptBayesianDesign"): - U_J_d[idx] = self.util_BayesianDesign(X_can, X_MC, sigma2Dict, - var) - return index, -1 * U_J_d - - # ------------------------------------------------------------------------- - def dual_annealing(self, method, Bounds, sigma2Dict, var, Run_No, - verbose=False): - """ - Exploration algorithm to find the optimum parameter space. - - Parameters - ---------- - method : string - Exploitation method: `VarOptDesign`, `BayesActDesign` and - `BayesOptDesign`. - # TODO: BayesActDesign has no corresponding function call in this function! - Bounds : list of tuples - List of lower and upper boundaries of parameters. - sigma2Dict : dict - A dictionary containing the measurement errors (sigma^2). - var : unknown - Run_No : int - Run number. - verbose : bool, optional - Print out a summary. The default is False. - - Returns - ------- - Run_No : int - Run number. - array - Optimial candidate. - - """ - - Model = self.Model - max_func_itr = self.ExpDesign.max_func_itr - - Res_Global = None - if method.lower() == 'varoptdesign': - Res_Global = opt.dual_annealing(self.util_VarBasedDesign, - bounds=Bounds, - args=(Model, var), - maxfun=max_func_itr) - - elif method.lower() == 'bayesoptdesign': - Res_Global = opt.dual_annealing(self.util_BayesianDesign, - bounds=Bounds, - args=(Model, sigma2Dict, var), - maxfun=max_func_itr) - - if verbose: - print(f"Global minimum: xmin = {Res_Global.x}, " - f"f(xmin) = {Res_Global.fun:.6f}, nfev = {Res_Global.nfev}") - - return Run_No, Res_Global.x - - # ------------------------------------------------------------------------- - def tradeoff_weights(self, tradeoff_scheme, old_EDX, old_EDY): - """ - Calculates weights for exploration scores based on the requested - scheme: `None`, `equal`, `epsilon-decreasing` and `adaptive`. - - `None`: No exploration. - `equal`: Same weights for exploration and exploitation scores. - `epsilon-decreasing`: Start with more exploration and increase the - influence of exploitation along the way with an exponential decay - function - `adaptive`: An adaptive method based on: - Liu, Haitao, Jianfei Cai, and Yew-Soon Ong. "An adaptive sampling - approach for Kriging metamodeling by maximizing expected prediction - error." Computers & Chemical Engineering 106 (2017): 171-182. - - Parameters - ---------- - tradeoff_scheme : string - Trade-off scheme for exloration and exploitation scores. - old_EDX : array (n_samples, n_params) - Old experimental design (training points). - old_EDY : dict - Old model responses (targets). - - Returns - ------- - exploration_weight : float - Exploration weight. - exploitation_weight: float - Exploitation weight. - - """ - exploration_weight = None - - if tradeoff_scheme is None: - exploration_weight = 0 - - elif tradeoff_scheme == 'equal': - exploration_weight = 0.5 - - elif tradeoff_scheme == 'epsilon-decreasing': - # epsilon-decreasing scheme - # Start with more exploration and increase the influence of - # exploitation along the way with an exponential decay function - initNSamples = self.ExpDesign.n_init_samples - n_max_samples = self.ExpDesign.n_max_samples - - itrNumber = (self.ExpDesign.X.shape[0] - initNSamples) - itrNumber //= self.ExpDesign.n_new_samples - - tau2 = -(n_max_samples - initNSamples - 1) / np.log(1e-8) - exploration_weight = signal.exponential(n_max_samples - initNSamples, - 0, tau2, False)[itrNumber] - - elif tradeoff_scheme == 'adaptive': - - # Extract itrNumber - initNSamples = self.ExpDesign.n_init_samples - # n_max_samples = self.ExpDesign.n_max_samples - itrNumber = (self.ExpDesign.X.shape[0] - initNSamples) - itrNumber //= self.ExpDesign.n_new_samples - - if itrNumber == 0: - exploration_weight = 0.5 - else: - # New adaptive trade-off according to Liu et al. (2017) - # Mean squared error for last design point - last_EDX = old_EDX[-1].reshape(1, -1) - lastPCEY, _ = self.MetaModel.eval_metamodel(samples=last_EDX) - pce_y = np.array(list(lastPCEY.values()))[:, 0] - y = np.array(list(old_EDY.values()))[:, -1, :] - mseError = mean_squared_error(pce_y, y) - - # Mean squared CV - error for last design point - pce_y_prev = np.array(list(self._y_hat_prev.values()))[:, 0] - mseCVError = mean_squared_error(pce_y_prev, y) - - exploration_weight = min([0.5 * mseError / mseCVError, 1]) - - # Exploitation weight - exploitation_weight = 1 - exploration_weight - - return exploration_weight, exploitation_weight - - # ------------------------------------------------------------------------- - def choose_next_sample(self, sigma2=None, n_candidates=5, var='DKL'): - """ - Runs optimal sequential design. - - Parameters - ---------- - sigma2 : dict, optional - A dictionary containing the measurement errors (sigma^2). The - default is None. - n_candidates : int, optional - Number of candidate samples. The default is 5. - var : string, optional - Utility function. The default is None. # TODO: default is set to DKL, not none - - Raises - ------ - NameError - Wrong utility function. - - Returns - ------- - Xnew : array (n_samples, n_params) - Selected new training point(s). - """ - - # Initialization - Bounds = self.ExpDesign.bound_tuples - n_new_samples = self.ExpDesign.n_new_samples - explore_method = self.ExpDesign.explore_method - exploit_method = self.ExpDesign.exploit_method - n_cand_groups = self.ExpDesign.n_cand_groups - tradeoff_scheme = self.ExpDesign.tradeoff_scheme - - old_EDX = self.ExpDesign.X - old_EDY = self.ExpDesign.Y.copy() - ndim = self.ExpDesign.X.shape[1] - OutputNames = self.out_names - - # ----------------------------------------- - # ----------- CUSTOMIZED METHODS ---------- - # ----------------------------------------- - # Utility function exploit_method provided by user - if exploit_method.lower() == 'user': - # TODO: is the exploit_method meant here? - if not hasattr(self.ExpDesign, 'ExploitFunction') or self.ExpDesign.ExploitFunction is None: - raise AttributeError( - 'Function `ExploitFunction` not given to the ExpDesign, thus cannor run user-defined sequential' - 'scheme') - # TODO: syntax does not fully match the rest - can test this?? - Xnew, filteredSamples = self.ExpDesign.ExploitFunction(self) - - print("\n") - print("\nXnew:\n", Xnew) - - return Xnew, filteredSamples - - # Dual-Annealing works differently from the rest, so deal with this first - # Here exploration and exploitation are performed simulataneously - if explore_method == 'dual annealing': - # ------- EXPLORATION: OPTIMIZATION ------- - import time - start_time = time.time() - - # Divide the domain to subdomains - subdomains = subdomain(Bounds, n_new_samples) - - # Multiprocessing - if self.parallel: - args = [] - for i in range(n_new_samples): - args.append((exploit_method, subdomains[i], sigma2, var, i)) - pool = multiprocessing.Pool(multiprocessing.cpu_count()) - - # With Pool.starmap_async() - results = pool.starmap_async(self.dual_annealing, args).get() - - # Close the pool - pool.close() - # Without multiprocessing - else: - results = [] - for i in range(n_new_samples): - results.append(self.dual_annealing(exploit_method, subdomains[i], sigma2, var, i)) - - # New sample - Xnew = np.array([results[i][1] for i in range(n_new_samples)]) - print("\nXnew:\n", Xnew) - - # Computational cost - elapsed_time = time.time() - start_time - print("\n") - print(f"Elapsed_time: {round(elapsed_time, 2)} sec.") - print('-' * 20) - - return Xnew, None - - # Generate needed Exploration class - explore = Exploration(self.ExpDesign, n_candidates) - explore.w = 100 # * ndim #500 # TODO: where does this value come from? - - # Select criterion (mc-intersite-proj-th, mc-intersite-proj) - explore.mc_criterion = 'mc-intersite-proj' - - # Generate the candidate samples - # TODO: here use the sampling method provided by the expdesign? - # sampling_method = self.ExpDesign.sampling_method - - # TODO: changed this from 'random' for LOOCV - # TODO: these are commented out as they are not used !? - # if explore_method == 'LOOCV': - # allCandidates = self.ExpDesign.generate_samples(n_candidates, - # sampling_method) - # else: - # allCandidates, scoreExploration = explore.get_exploration_samples() - - # ----------------------------------------- - # ---------- EXPLORATION METHODS ---------- - # ----------------------------------------- - if explore_method == 'LOOCV': - # ----------------------------------------------------------------- - # TODO: LOOCV model construnction based on Feng et al. (2020) - # 'LOOCV': - # Initilize the ExploitScore array - - # Generate random samples - allCandidates = self.ExpDesign.generate_samples(n_candidates, - 'random') - - # Construct error model based on LCerror - errorModel = self.MetaModel.create_ModelError(old_EDX, self.LCerror) - self.errorModel.append(copy(errorModel)) - - # Evaluate the error models for allCandidates - eLCAllCands, _ = errorModel.eval_errormodel(allCandidates) - # Select the maximum as the representative error - eLCAllCands = np.dstack(eLCAllCands.values()) - eLCAllCandidates = np.max(eLCAllCands, axis=1)[:, 0] - - # Normalize the error w.r.t the maximum error - scoreExploration = eLCAllCandidates / np.sum(eLCAllCandidates) - - else: - # ------- EXPLORATION: SPACE-FILLING DESIGN ------- - # Generate candidate samples from Exploration class - explore = Exploration(self.ExpDesign, n_candidates) - explore.w = 100 # * ndim #500 - # Select criterion (mc-intersite-proj-th, mc-intersite-proj) - explore.mc_criterion = 'mc-intersite-proj' - allCandidates, scoreExploration = explore.get_exploration_samples() - - # Temp: ---- Plot all candidates ----- - if ndim == 2: - def plotter(points, allCandidates, Method, - scoreExploration=None): - """ - unknown - - Parameters - ---------- - points - allCandidates - Method - scoreExploration - - Returns - ------- - - """ - if Method == 'Voronoi': - from scipy.spatial import Voronoi, voronoi_plot_2d - vor = Voronoi(points) - fig = voronoi_plot_2d(vor) - ax1 = fig.axes[0] - else: - fig = plt.figure() - ax1 = fig.add_subplot(111) - ax1.scatter(points[:, 0], points[:, 1], s=10, c='r', - marker="s", label='Old Design Points') - ax1.scatter(allCandidates[:, 0], allCandidates[:, 1], s=10, - c='b', marker="o", label='Design candidates') - for i in range(points.shape[0]): - txt = 'p' + str(i + 1) - ax1.annotate(txt, (points[i, 0], points[i, 1])) - if scoreExploration is not None: - for i in range(allCandidates.shape[0]): - txt = str(round(scoreExploration[i], 5)) - ax1.annotate(txt, (allCandidates[i, 0], - allCandidates[i, 1])) - - plt.xlim(self.bound_tuples[0]) - plt.ylim(self.bound_tuples[1]) - # plt.show() - plt.legend(loc='upper left') - - # ----------------------------------------- - # --------- EXPLOITATION METHODS ---------- - # ----------------------------------------- - if exploit_method.lower() == 'bayesoptdesign' or \ - exploit_method.lower() == 'bayesactdesign': - - # ------- Calculate Exoploration weight ------- - # Compute exploration weight based on trade off scheme - explore_w, exploit_w = self.tradeoff_weights(tradeoff_scheme, - old_EDX, - old_EDY) - print(f"\n Exploration weight={explore_w:0.3f} " - f"Exploitation weight={exploit_w:0.3f}\n") - - # ------- EXPLOITATION: BayesOptDesign & ActiveLearning ------- - if explore_w != 1.0: - # Check if all needed properties are set - if not hasattr(self.ExpDesign, 'max_func_itr'): - raise AttributeError('max_func_itr not given to the experimental design') - - # Create a sample pool for rejection sampling - MCsize = 15000 - X_MC = self.ExpDesign.generate_samples(MCsize, 'random') - candidates = self.ExpDesign.generate_samples( - n_candidates, 'latin_hypercube') - - # Split the candidates in groups for multiprocessing - split_cand = np.array_split( - candidates, n_cand_groups, axis=0 - ) - # print(candidates) - # print(split_cand) - if self.parallel: - results = Parallel(n_jobs=-1, backend='multiprocessing')( - delayed(self.run_util_func)( - exploit_method, split_cand[i], i, sigma2, var, X_MC) - for i in range(n_cand_groups)) - else: - results = [] - for i in range(n_cand_groups): - results.append(self.run_util_func(exploit_method, split_cand[i], i, sigma2, var, X_MC)) - - # Retrieve the results and append them - U_J_d = np.concatenate([results[NofE][1] for NofE in - range(n_cand_groups)]) - - # Check if all scores are inf - if np.isinf(U_J_d).all() or np.isnan(U_J_d).all(): - U_J_d = np.ones(len(U_J_d)) - - # Get the expected value (mean) of the Utility score - # for each cell - if explore_method == 'Voronoi': - U_J_d = np.mean(U_J_d.reshape(-1, n_candidates), axis=1) - - # Normalize U_J_d - norm_U_J_d = U_J_d / np.sum(U_J_d) - else: - norm_U_J_d = np.zeros((len(scoreExploration))) - - # ------- Calculate Total score ------- - # ------- Trade off between EXPLORATION & EXPLOITATION ------- - # Accumulate the samples - finalCandidates = np.concatenate((allCandidates, candidates), axis=0) - finalCandidates = np.unique(finalCandidates, axis=0) - - # Calculations take into account both exploration and exploitation - # samples without duplicates - totalScore = np.zeros(finalCandidates.shape[0]) - # self.totalScore = totalScore - - for cand_idx in range(finalCandidates.shape[0]): - # find candidate indices - idx1 = np.where(allCandidates == finalCandidates[cand_idx])[0] - idx2 = np.where(candidates == finalCandidates[cand_idx])[0] - - # exploration - if idx1.shape[0] > 0: - idx1 = idx1[0] - totalScore[cand_idx] += explore_w * scoreExploration[idx1] - - # exploitation - if idx2.shape[0] > 0: - idx2 = idx2[0] - totalScore[cand_idx] += exploit_w * norm_U_J_d[idx2] - - # Total score - totalScore = exploit_w * norm_U_J_d - totalScore += explore_w * scoreExploration - - # temp: Plot - # dim = self.ExpDesign.X.shape[1] - # if dim == 2: - # plotter(self.ExpDesign.X, allCandidates, explore_method) - - # ------- Select the best candidate ------- - # find an optimal point subset to add to the initial design by - # maximization of the utility score and taking care of NaN values - temp = totalScore.copy() - temp[np.isnan(totalScore)] = -np.inf - sorted_idxtotalScore = np.argsort(temp)[::-1] - bestIdx = sorted_idxtotalScore[:n_new_samples] - - # select the requested number of samples - if explore_method == 'Voronoi': - Xnew = np.zeros((n_new_samples, ndim)) - for i, idx in enumerate(bestIdx): - X_can = explore.closestPoints[idx] - - # Calculate the maxmin score for the region of interest - newSamples, maxminScore = explore.get_mc_samples(X_can) - - # select the requested number of samples - Xnew[i] = newSamples[np.argmax(maxminScore)] - else: - # Changed this from allCandiates to full set of candidates - # TODO: still not changed for e.g. 'Voronoi' - Xnew = finalCandidates[sorted_idxtotalScore[:n_new_samples]] - - - elif exploit_method.lower() == 'varoptdesign': - # ------- EXPLOITATION: VarOptDesign ------- - UtilMethod = var - - # ------- Calculate Exoploration weight ------- - # Compute exploration weight based on trade off scheme - explore_w, exploit_w = self.tradeoff_weights(tradeoff_scheme, - old_EDX, - old_EDY) - print(f"\nweightExploration={explore_w:0.3f} " - f"weightExploitation={exploit_w:0.3f}") - - # Generate candidate samples from Exploration class - nMeasurement = old_EDY[OutputNames[0]].shape[1] - - # print(UtilMethod) - - # Find sensitive region - if UtilMethod == 'LOOCV': - # TODO: why is this not inside the VarOptDesign function? - LCerror = self.MetaModel.LCerror - allModifiedLOO = np.zeros((len(old_EDX), len(OutputNames), - nMeasurement)) - for y_idx, y_key in enumerate(OutputNames): - for idx, key in enumerate(LCerror[y_key].keys()): - allModifiedLOO[:, y_idx, idx] = abs( - LCerror[y_key][key]) - - ExploitScore = np.max(np.max(allModifiedLOO, axis=1), axis=1) - - elif UtilMethod in ['EIGF', 'ALM']: - # ----- All other in ['EIGF', 'ALM'] ----- - # Initilize the ExploitScore array - # ExploitScore = np.zeros((len(old_EDX), len(OutputNames))) - - # Split the candidates in groups for multiprocessing - if explore_method != 'Voronoi': - split_cand = np.array_split(allCandidates, - n_cand_groups, - axis=0) - goodSampleIdx = range(n_cand_groups) - else: - # Find indices of the Vornoi cells with samples - goodSampleIdx = [] - for idx in range(len(explore.closest_points)): - if len(explore.closest_points[idx]) != 0: - goodSampleIdx.append(idx) - split_cand = explore.closest_points - - # Split the candidates in groups for multiprocessing - args = [] - for index in goodSampleIdx: - args.append((exploit_method, split_cand[index], index, - sigma2, var)) - - # Multiprocessing - pool = multiprocessing.Pool(multiprocessing.cpu_count()) - # With Pool.starmap_async() - results = pool.starmap_async(self.run_util_func, args).get() - - # Close the pool - pool.close() - - # Retrieve the results and append them - if explore_method == 'Voronoi': - ExploitScore = [np.mean(results[k][1]) for k in - range(len(goodSampleIdx))] - else: - ExploitScore = np.concatenate( - [results[k][1] for k in range(len(goodSampleIdx))]) - - else: - raise NameError('The requested utility function is not ' - 'available.') - - # print("ExploitScore:\n", ExploitScore) - - # find an optimal point subset to add to the initial design by - # maximization of the utility score and taking care of NaN values - # Total score - # Normalize U_J_d - ExploitScore = ExploitScore / np.sum(ExploitScore) - totalScore = exploit_w * ExploitScore - # print(totalScore.shape) - # print(explore_w) - # print(scoreExploration.shape) - totalScore += explore_w * scoreExploration - - temp = totalScore.copy() - sorted_idxtotalScore = np.argsort(temp, axis=0)[::-1] - bestIdx = sorted_idxtotalScore[:n_new_samples] - - Xnew = np.zeros((n_new_samples, ndim)) - if explore_method != 'Voronoi': - Xnew = allCandidates[bestIdx] - else: - for i, idx in enumerate(bestIdx.flatten()): - X_can = explore.closest_points[idx] - # plotter(self.ExpDesign.X, X_can, explore_method, - # scoreExploration=None) - - # Calculate the maxmin score for the region of interest - newSamples, maxminScore = explore.get_mc_samples(X_can) - - # select the requested number of samples - Xnew[i] = newSamples[np.argmax(maxminScore)] - - elif exploit_method.lower() == 'alphabetic': - # ------- EXPLOITATION: ALPHABETIC ------- - Xnew = self.util_AlphOptDesign(allCandidates, var) - - elif exploit_method == 'Space-filling': - # ------- EXPLOITATION: SPACE-FILLING ------- - totalScore = scoreExploration - - # ------- Select the best candidate ------- - # find an optimal point subset to add to the initial design by - # maximization of the utility score and taking care of NaN values - temp = totalScore.copy() - temp[np.isnan(totalScore)] = -np.inf - sorted_idxtotalScore = np.argsort(temp)[::-1] - - # select the requested number of samples - Xnew = allCandidates[sorted_idxtotalScore[:n_new_samples]] - - else: - raise NameError('The requested design method is not available.') - - print("\n") - print("\nRun No. {}:".format(old_EDX.shape[0] + 1)) - print("Xnew:\n", Xnew) - - # TODO: why does it also return None? - return Xnew, None - - # ------------------------------------------------------------------------- - def util_AlphOptDesign(self, candidates, var='D-Opt'): - """ - Enriches the Experimental design with the requested alphabetic - criterion based on exploring the space with number of sampling points. - - Ref: Hadigol, M., & Doostan, A. (2018). Least squares polynomial chaos - expansion: A review of sampling strategies., Computer Methods in - Applied Mechanics and Engineering, 332, 382-407. - - Arguments - --------- - candidates : int? - Number of candidate points to be searched - - var : string - Alphabetic optimality criterion - - Returns - ------- - X_new : array of shape (1, n_params) - The new sampling location in the input space. - """ - MetaModelOrig = self # TODO: this doesn't fully seem correct? - n_new_samples = MetaModelOrig.ExpDesign.n_new_samples - NCandidate = candidates.shape[0] - - # TODO: Loop over outputs - OutputName = self.out_names[0] - - # To avoid changes ub original aPCE object - # MetaModel = deepcopy(MetaModelOrig) - - # Old Experimental design - oldExpDesignX = self.ExpDesign.X - - # TODO: Only one psi can be selected. - # Suggestion: Go for the one with the highest LOO error - # TODO: this is just a patch, need to look at again! - Scores = list(self.MetaModel.score_dict['b_1'][OutputName].values()) - ModifiedLOO = [1 - score for score in Scores] - outIdx = np.argmax(ModifiedLOO) - - # Initialize Phi to save the criterion's values - Phi = np.zeros(NCandidate) - - # TODO: also patched here - BasisIndices = self.MetaModel.basis_dict['b_1'][OutputName]["y_" + str(outIdx + 1)] - P = len(BasisIndices) - - # ------ Old Psi ------------ - univ_p_val = self.MetaModel.univ_basis_vals(oldExpDesignX) - Psi = self.MetaModel.create_psi(BasisIndices, univ_p_val) - - # ------ New candidates (Psi_c) ------------ - # Assemble Psi_c - univ_p_val_c = self.MetaModel.univ_basis_vals(candidates) - Psi_c = self.MetaModel.create_psi(BasisIndices, univ_p_val_c) - - for idx in range(NCandidate): - - # Include the new row to the original Psi - Psi_cand = np.vstack((Psi, Psi_c[idx])) - - # Information matrix - PsiTPsi = np.dot(Psi_cand.T, Psi_cand) - M = PsiTPsi / (len(oldExpDesignX) + 1) - - if 1e-12 < np.linalg.cond(PsiTPsi) < 1 / sys.float_info.epsilon: - # faster - invM = linalg.solve(M, sparse.eye(PsiTPsi.shape[0]).toarray()) - else: - # stabler - invM = np.linalg.pinv(M) - - # ---------- Calculate optimality criterion ---------- - # Optimality criteria according to Section 4.5.1 in Ref. - - # D-Opt - if var.lower() == 'd-opt': - Phi[idx] = (np.linalg.det(invM)) ** (1 / P) - - # A-Opt - elif var.lower() == 'a-opt': - Phi[idx] = np.trace(invM) - - # K-Opt - elif var.lower() == 'k-opt': - Phi[idx] = np.linalg.cond(M) - - else: - # print(var.lower()) - raise Exception('The optimality criterion you requested has ' - 'not been implemented yet!') - - # find an optimal point subset to add to the initial design - # by minimization of the Phi - sorted_idxtotalScore = np.argsort(Phi) - - # select the requested number of samples - Xnew = candidates[sorted_idxtotalScore[:n_new_samples]] - - return Xnew - # ------------------------------------------------------------------------- def _normpdf(self, y_hat_pce, std_pce, obs_data, total_sigma2s, rmse=None):