diff --git a/src/bayesvalidrox/surrogate_models/exp_designs.py b/src/bayesvalidrox/surrogate_models/exp_designs.py index a9c61eddfb5da41b62e0ab0377c9eb0aaa0d951b..49a56e1ce26c6449cbf2d88ac639fa142281881a 100644 --- a/src/bayesvalidrox/surrogate_models/exp_designs.py +++ b/src/bayesvalidrox/surrogate_models/exp_designs.py @@ -48,6 +48,7 @@ class ExpDesigns: self.step_snapshot = step_snapshot self.max_a_post = max_a_post + # ------------------------------------------------------------------------- def generate_samples(self, n_samples, sampling_method='random', transform=False): """ @@ -82,6 +83,7 @@ class ExpDesigns: else: return samples.T + # ------------------------------------------------------------------------- def generate_ED(self, n_samples, sample_method='random', transform=False, max_pce_deg=None): """ @@ -100,7 +102,7 @@ class ExpDesigns: Returns ------- - array of shape (n_samples, n_params) + samples : array of shape (n_samples, n_params) Selected training samples. """ @@ -109,62 +111,58 @@ class ExpDesigns: if not hasattr(self, 'n_init_samples'): self.n_init_samples = self.ndim + 1 n_samples = int(n_samples) - if len(Inputs.Marginals[0].input_data) == 0: - self.arbitrary = False + + # Check if PCE or aPCE metamodel is selected. + if self.meta_Model.lower() == 'apce': + self.apce = True + else: + self.apce = False + + # Check if input is given as dist or input_data. + if len(Inputs.Marginals[0].input_data): + self.input_data_given = True else: - self.arbitrary = True + self.input_data_given = False # Get the bounds if input_data are directly defined by user: - if Inputs.Marginals[0].parameters is None: + if self.input_data_given: for i in range(self.ndim): low_bound = np.min(Inputs.Marginals[i].input_data) up_bound = np.max(Inputs.Marginals[i].input_data) Inputs.Marginals[i].Parameters = [low_bound, up_bound] + # Generate the samples based on requested method + self.raw_data, self.bound_tuples = self.init_param_space(max_pce_deg) + + # Pass user-defined samples as ED if self.sampling_method == 'user': samples = self.X self.NrSamples = len(samples) # Sample the distribution of parameters - if not self.arbitrary: - # Case I = polytype not arbitrary - # Execute initialization to get the boundtuples - raw_data, bounds = self.Parameter_Initialization(max_pce_deg) - self.raw_data = raw_data - self.BoundTuples = bounds - # Create ExpDesign in the actual space - if self.sampling_method != 'user': - samples = chaospy.generate_samples(n_samples, domain=self.JDist, - rule=sample_method).T - elif self.arbitrary: - # Case II: polytype arbitrary or Input values are directly given by - # the user. - - # Generate the samples based on requested method - if self.sampling_method == 'user': - raw_data, bounds = self.Parameter_Initialization(max_pce_deg) - self.raw_data = raw_data - self.BoundTuples = bounds - - elif self.sampling_method == 'random': - samples = self.MCSampling(n_samples, max_pce_deg) - - elif self.sampling_method == 'PCM' or self.sampling_method == 'LSCM': - samples = self.PCMSampling(max_pce_deg) + elif self.input_data_given: + # Case II: Input values are directly given by the user. - else: - # Execute initialization to get the boundtuples - raw_data, bounds = self.Parameter_Initialization(max_pce_deg) - self.raw_data = raw_data - self.BoundTuples = bounds + if self.sampling_method == 'random': + samples = self.random_sampler(n_samples) + elif self.sampling_method == 'PCM' or \ + self.sampling_method == 'LSCM': + samples = self.pcm_sampler(max_pce_deg) + + else: # Create ExpDesign in the actual space using chaospy try: samples = chaospy.generate_samples(n_samples, domain=self.JDist, rule=sample_method).T except: - samples = self.MCSampling(n_samples, max_pce_deg) + samples = self.JDist.sample(n_samples) + + elif not self.input_data_given: + # Case I = User passed known distributions + samples = chaospy.generate_samples(n_samples, domain=self.JDist, + rule=sample_method).T # Transform samples to the original space if transform: @@ -173,127 +171,112 @@ class ExpDesigns: else: return samples - def transform(self, X, params=None): + # ------------------------------------------------------------------------- + def init_param_space(self, max_deg=None): """ - Transform the samples via either a Rosenblatt or an isoprobabilistic - transformation. + Initializes parameter space. Parameters ---------- - X : ndarray of shape (n_samples,n_params) - Samples to be transformed. + max_deg : int, optional + Maximum degree. The default is None. Returns ------- - tr_X: ndarray of shape (n_samples,n_params) - Transformed samples. + raw_data : array of shape (n_params, n_samples) + Raw data. + bound_tuples : list of tuples + A list containing lower and upper bounds of parameters. """ - if self.InputObj.Rosenblatt: - self.origJDist, _ = self.DistConstructor(False) - tr_X = self.origJDist.inv(self.JDist.fwd(X.T)).T - else: - # Transform samples via an isoprobabilistic transformation - n_samples, n_params = X.shape - Inputs = self.InputObj - origJDist = self.JDist - poly_types = self.poly_types - - disttypes = [] - for par_i in range(n_params): - disttypes.append(Inputs.Marginals[par_i].dist_type) - - # Pass non-transformed X, if arbitrary PCE is selected. - if None in disttypes or self.arbitrary: - return X + Inputs = self.InputObj + ndim = self.ndim + rosenblatt_flag = Inputs.Rosenblatt + mc_size = 50000 - cdfx = np.zeros((X.shape)) - tr_X = np.zeros((X.shape)) + # Save parameter names + self.par_names = [] + for parIdx in range(ndim): + self.par_names.append(Inputs.Marginals[parIdx].name) - for par_i in range(n_params): + # Create a multivariate probability distribution + if max_deg is not None: + JDist, poly_types = self.build_dist(rosenblatt=rosenblatt_flag) + self.JDist, self.poly_types = JDist, poly_types - # Extract the parameters of the original space - disttype = disttypes[par_i] - if disttype is not None: - dist = origJDist[par_i] - else: - dist = None - polytype = poly_types[par_i] - cdf = np.vectorize(lambda x: dist.cdf(x)) + if self.input_data_given: - # Extract the parameters of the transformation space based on - # polyType - if polytype == 'legendre' or disttype == 'uniform': - # Generate Y_Dists based - params_Y = [-1, 1] - dist_Y = st.uniform(loc=params_Y[0], - scale=params_Y[1]-params_Y[0]) - inv_cdf = np.vectorize(lambda x: dist_Y.ppf(x)) + self.MCSize = len(Inputs.Marginals[0].input_data) + self.raw_data = np.zeros((ndim, self.MCSize)) - elif polytype == 'hermite' or disttype == 'norm': - params_Y = [0, 1] - dist_Y = st.norm(loc=params_Y[0], scale=params_Y[1]) - inv_cdf = np.vectorize(lambda x: dist_Y.ppf(x)) + for parIdx in range(ndim): + # Save parameter names + try: + self.raw_data[parIdx] = np.array( + Inputs.Marginals[parIdx].input_data) + except: + self.raw_data[parIdx] = self.JDist[parIdx].sample(mc_size) - elif polytype == 'laguerre' or disttype == 'gamma': - params_Y = [1, params[1]] - dist_Y = st.gamma(loc=params_Y[0], scale=params_Y[1]) - inv_cdf = np.vectorize(lambda x: dist_Y.ppf(x)) + else: + # Generate random samples based on parameter distributions + self.raw_data = chaospy.generate_samples(mc_size, + domain=self.JDist) - elif polytype == 'arbitrary' or disttype is None: - # mu_X = Inputs.Marginals[par_i].moments[0] - # stDev_X = Inputs.Marginals[par_i].moments[1] - # cdf = np.vectorize(lambda x: (x - mu_X) / stDev_X) - # # TODO: Unknown dist with gaussian_kde - # mu_Y = Y_marginals[par_i].moments[0] - # stDev_Y = Y_marginals[par_i].moments[1] - # inv_cdf = np.vectorize(lambda x: stDev_Y * x + mu_Y) - pass + # Create orthogonal polynomial coefficients if necessary + if self.meta_Model.lower() != 'gpe' and self.apce\ + and max_deg is not None \ + and Inputs.polycoeffsFlag: + self.polycoeffs = {} + for parIdx in tqdm(range(ndim), ascii=True, + desc="Computing orth. polynomial coeffs"): + poly_coeffs = apoly_construction(self.raw_data[parIdx], + max_deg) + self.polycoeffs[f'p_{parIdx+1}'] = poly_coeffs - # Compute CDF_x(X) - cdfx[:, par_i] = cdf(X[:, par_i]) + # Extract moments + for parIdx in range(ndim): + mu = np.mean(self.raw_data[parIdx]) + std = np.std(self.raw_data[parIdx]) + self.InputObj.Marginals[parIdx].moments = [mu, std] - # Compute invCDF_y(cdfx) - tr_X[:, par_i] = inv_cdf(cdfx[:, par_i]) + # Generate the bounds based on given inputs for marginals + bound_tuples = [] + for i in range(ndim): + if Inputs.Marginals[i].dist_type == 'unif': + low_bound, up_bound = Inputs.Marginals[i].parameters + else: + low_bound = np.min(self.raw_data[i]) + up_bound = np.max(self.raw_data[i]) - return tr_X + bound_tuples.append((low_bound, up_bound)) - def FitDist(self, y): - dist_results = [] - params = {} - dist_names = ['lognorm', 'norm', 'uniform', 'expon'] - for dist_name in dist_names: - dist = getattr(st, dist_name) + self.bound_tuples = tuple(bound_tuples) - try: - if dist_name != 'lognorm': - param = dist.fit(y) - else: - param = dist.fit(np.exp(y), floc=0) - except: - param = dist.fit(y) + return self.raw_data, self.bound_tuples - params[dist_name] = param - # Applying the Kolmogorov-Smirnov test - D, p = st.kstest(y, dist_name, args=param) - dist_results.append((dist_name, D)) + # ------------------------------------------------------------------------- + def build_dist(self, rosenblatt): + """ + Creates the polynomial types to be passed to univ_basis_vals method of + the MetaModel object. - # select the best fitted distribution - sel_dist, D = (min(dist_results, key=lambda item: item[1])) + Parameters + ---------- + rosenblatt : bool + Rosenblatt transformation flag. - if sel_dist == 'uniform': - params[sel_dist] = [params[sel_dist][0], params[sel_dist][0] + - params[sel_dist][1]] - if D < 0.05: - return sel_dist, params[sel_dist] - else: - return None, None + Returns + ------- + orig_space_dist : object + A chaospy JDist object or a gaussian_kde object. + poly_types : list + List of polynomial types for the parameters. - def DistConstructor(self, rosenblatt): + """ Inputs = self.InputObj all_data = [] - all_DistTypes = [] - origJoints = [] + all_dist_types = [] + orig_joints = [] poly_types = [] for parIdx in range(self.ndim): @@ -301,169 +284,126 @@ class ExpDesigns: if Inputs.Marginals[parIdx].dist_type is None: data = Inputs.Marginals[parIdx].input_data all_data.append(data) - DistType, params = self.FitDist(data) - Inputs.Marginals[parIdx].dist_type = DistType + dist_type, params = self.FitDist(data) + Inputs.Marginals[parIdx].dist_type = dist_type Inputs.Marginals[parIdx].parameters = params else: - DistType = Inputs.Marginals[parIdx].dist_type + dist_type = Inputs.Marginals[parIdx].dist_type params = Inputs.Marginals[parIdx].parameters + # Make disttpe lower case for consistancy + dist_type = dist_type.lower() + if rosenblatt: polytype = 'hermite' - Dist = chaospy.Normal() + dist = chaospy.Normal() - elif DistType is None: + elif dist_type is None: polytype = 'arbitrary' - Dist = None + dist = None - elif 'unif' in DistType: + elif 'unif' in dist_type: polytype = 'legendre' - Dist = chaospy.Uniform(lower=params[0], upper=params[1]) + dist = chaospy.Uniform(lower=params[0], upper=params[1]) - elif DistType == 'norm': + elif 'norm' in dist_type: polytype = 'hermite' - Dist = chaospy.Normal(mu=params[0], sigma=params[1]) + dist = chaospy.Normal(mu=params[0], sigma=params[1]) - elif DistType == 'gamma': + elif 'gamma' in dist_type: polytype = 'laguerre' - Dist = chaospy.Gamma(shape=params[0], + dist = chaospy.Gamma(shape=params[0], scale=params[1], shift=params[2]) - elif DistType == 'beta': + elif 'beta' in dist_type: polytype = 'jacobi' - Dist = chaospy.Beta(alpha=params[0], beta=params[1], + dist = chaospy.Beta(alpha=params[0], beta=params[1], lower=params[2], upper=params[3]) - elif 'lognorm' in DistType: + elif 'lognorm' in dist_type: polytype = 'hermite' Mu = np.log(params[0]**2/np.sqrt(params[0]**2 + params[1]**2)) Sigma = np.sqrt(np.log(1 + params[1]**2 / params[0]**2)) - Dist = chaospy.LogNormal(mu=Mu, sigma=Sigma) + dist = chaospy.LogNormal(mu=Mu, sigma=Sigma) - elif DistType == 'exponential' or 'expon' in DistType: + elif 'expon' in dist_type: polytype = 'arbitrary' - Dist = chaospy.Exponential(scale=params[0], shift=params[1]) + dist = chaospy.Exponential(scale=params[0], shift=params[1]) - elif DistType == 'weibull': + elif 'weibull' in dist_type: polytype = 'arbitrary' - Dist = chaospy.Weibull(shape=params[0], scale=params[1], + dist = chaospy.Weibull(shape=params[0], scale=params[1], shift=params[2]) else: - message = (f"DistType {DistType} for parameter" + message = (f"DistType {dist_type} for parameter" f"{parIdx+1} is not available.") raise ValueError(message) - if self.arbitrary: + if self.input_data_given or self.apce: polytype = 'arbitrary' # Store dists and poly_types - origJoints.append(Dist) + orig_joints.append(dist) poly_types.append(polytype) - all_DistTypes.append(DistType) + all_dist_types.append(dist_type) # Prepare final output to return - if None in all_DistTypes: + if None in all_dist_types: # Naive approach: Fit a gaussian kernel to the provided data Data = np.asarray(all_data) - origSpaceDist = st.gaussian_kde(Data) - self.priorSpace = origSpaceDist + orig_space_dist = st.gaussian_kde(Data) + self.priorSpace = orig_space_dist else: - origSpaceDist = chaospy.J(*origJoints) - self.priorSpace = st.gaussian_kde(origSpaceDist.sample(10000)) + orig_space_dist = chaospy.J(*orig_joints) + self.priorSpace = st.gaussian_kde(orig_space_dist.sample(10000)) - return origSpaceDist, poly_types + return orig_space_dist, poly_types - def Parameter_Initialization(self, MaxPceDegree=None, OptDesignFlag=False): + # ------------------------------------------------------------------------- + def random_sampler(self, n_samples): """ - Initialization Uncertain Parameters in case arbitrary polytype is - selected. - """ - Inputs = self.InputObj - ndim = self.ndim - Rosenblatt = Inputs.Rosenblatt - self.MCSize = 10000 - - # Create a multivariate probability distribution - if MaxPceDegree is not None: - JDist, poly_types = self.DistConstructor(rosenblatt=Rosenblatt) - self.JDist, self.poly_types = JDist, poly_types - - if self.arbitrary: - - self.MCSize = len(Inputs.Marginals[0].input_data) - self.raw_data = np.zeros((ndim, self.MCSize)) - self.par_names = [] - - for parIdx in range(ndim): - # Save parameter names - self.par_names.append(Inputs.Marginals[parIdx].name) - try: - self.raw_data[parIdx] = np.array(Inputs.Marginals[parIdx].input_data) - except: - self.raw_data[parIdx] = self.JDist[parIdx].sample(self.MCSize) - - # Create orthogonal polynomial coefficients if necessary - if self.meta_Model.lower() != 'gpe' \ - and MaxPceDegree is not None \ - and Inputs.polycoeffsFlag: - self.polycoeffs = {} - for parIdx in tqdm(range(ndim), ascii=True, desc="Computing orth. polynomial coeffs"): - self.polycoeffs[f'p_{parIdx+1}'] = apoly_construction(self.raw_data[parIdx], MaxPceDegree) - else: - # Generate random samples based on parameter distributions - self.raw_data = chaospy.generate_samples(self.MCSize, - domain=self.JDist) - # Extract moments - for parIdx in range(ndim): - mu = np.mean(self.raw_data[parIdx]) - std = np.std(self.raw_data[parIdx]) - self.InputObj.Marginals[parIdx].moments = [mu, std] - - # Generate the bounds based on given inputs for marginals - BoundTuples = [] - for i in range(ndim): - if Inputs.Marginals[i].dist_type == 'unif': - low_bound, up_bound = Inputs.Marginals[i].Parameters - else: - low_bound = np.min(self.raw_data[i]) - up_bound = np.max(self.raw_data[i]) + Samples the given raw data randomly. - BoundTuples.append((low_bound, up_bound)) - - self.BoundTuples = tuple(BoundTuples) - - return self.raw_data, self.BoundTuples + Parameters + ---------- + n_samples : int + Number of requested samples. - def MCSampling(self, NrSamples, MaxPceDegree): - """ - Compute the requested number of sampling points. - Arguments - --------- - NrSamples : int - Number of points requested. Returns ------- - ndarray[NrSamples, NofPa] + samples: array of shape (n_samples, n_params) The sampling locations in the input space. - """ - # Execute initialization to get the boundtuples - raw_data, BoundTuples = self.Parameter_Initialization(MaxPceDegree) - self.raw_data, self.BoundTuples = raw_data, BoundTuples - - Samples = np.zeros((NrSamples, self.ndim)) + """ + samples = np.zeros((n_samples, self.ndim)) for idxPa in range(self.ndim): # input_data given sample_size = len(self.raw_data[idxPa]) - randIdx = np.random.randint(0, sample_size, NrSamples) - Samples[:, idxPa] = self.raw_data[idxPa, randIdx] + randIdx = np.random.randint(0, sample_size, n_samples) + samples[:, idxPa] = self.raw_data[idxPa, randIdx] + + return samples + + # ------------------------------------------------------------------------- + def pcm_sampler(self, max_deg): + """ + Generates collocation points based on the root of the polynomial + degrees. - return Samples + Parameters + ---------- + max_deg : int + Maximum degree defined by user. + + Returns + ------- + opt_col_points: array of shape (n_samples, n_params) + Collocation points. - def PCMSampling(self, MaxPceDegree): + """ raw_data = self.raw_data @@ -475,49 +415,174 @@ class ExpDesigns: (math.factorial(self.ndim) * math.factorial(d))) return np.array(result) - guess_Deg = np.where(M_uptoMax(MaxPceDegree) > self.NrSamples)[0][0] + guess_Deg = np.where(M_uptoMax(max_deg) > self.NrSamples)[0][0] - Cpoints = np.zeros(shape=[guess_Deg+1, self.ndim]) + c_points = np.zeros((guess_Deg+1, self.ndim)) def PolynomialPa(parIdx): - return aPoly_Construction(self.raw_data[parIdx], MaxPceDegree) + return apoly_construction(self.raw_data[parIdx], max_deg) for i in range(self.ndim): poly_coeffs = PolynomialPa(i)[guess_Deg+1][::-1] - Cpoints[:, i] = np.trim_zeros(np.roots(poly_coeffs)) + c_points[:, i] = np.trim_zeros(np.roots(poly_coeffs)) # Construction of optimal integration points Prod = itertools.product(np.arange(1, guess_Deg+2), repeat=self.ndim) - SortDigUniqueCombos = np.array(list(filter(lambda x: x, Prod))) + sort_dig_unique_combos = np.array(list(filter(lambda x: x, Prod))) # Ranking relatively mean Temp = np.empty(shape=[0, guess_Deg+1]) for j in range(self.ndim): - s = abs(Cpoints[:, j]-np.mean(raw_data[j])) + s = abs(c_points[:, j]-np.mean(raw_data[j])) Temp = np.append(Temp, [s], axis=0) temp = Temp.T index_CP = np.sort(temp, axis=0) - SortCpoints = np.empty(shape=[0, guess_Deg+1]) + sort_cpoints = np.empty((0, guess_Deg+1)) for j in range(self.ndim): - SortCp = Cpoints[index_CP[:, j], j] - SortCpoints = np.vstack((SortCpoints, SortCp)) + sort_cp = c_points[index_CP[:, j], j] + sort_cpoints = np.vstack((sort_cpoints, sort_cp)) # Mapping of Combination to Cpoint Combination - SortUniqueCombos = np.empty(shape=[0, self.ndim]) - for i in range(len(SortDigUniqueCombos)): - SortUnComb = [] + sort_unique_combos = np.empty(shape=[0, self.ndim]) + for i in range(len(sort_dig_unique_combos)): + sort_un_comb = [] for j in range(self.ndim): - SortUC = SortCpoints[j, SortDigUniqueCombos[i, j]-1] - SortUnComb.append(SortUC) - SortUnicomb = np.asarray(SortUnComb) - SortUniqueCombos = np.vstack((SortUniqueCombos, SortUnicomb)) + SortUC = sort_cpoints[j, sort_dig_unique_combos[i, j]-1] + sort_un_comb.append(SortUC) + sort_uni_comb = np.asarray(sort_un_comb) + sort_unique_combos = np.vstack((sort_unique_combos, sort_uni_comb)) # Output the collocation points - if self.sampling_method == 'LSCM': - OptimalCollocationPointsBase = SortUniqueCombos + if self.sampling_method.lower() == 'lscm': + opt_col_points = sort_unique_combos else: - OptimalCollocationPointsBase = SortUniqueCombos[0:self.NrSamples] + opt_col_points = sort_unique_combos[0:self.NrSamples] + + return opt_col_points + + # ------------------------------------------------------------------------- + def transform(self, X, params=None): + """ + Transform the samples via either a Rosenblatt or an isoprobabilistic + transformation. + + Parameters + ---------- + X : ndarray of shape (n_samples,n_params) + Samples to be transformed. + + Returns + ------- + tr_X: ndarray of shape (n_samples,n_params) + Transformed samples. + + """ + if self.InputObj.Rosenblatt: + self.origJDist, _ = self.build_dist(False) + tr_X = self.origJDist.inv(self.JDist.fwd(X.T)).T + else: + # Transform samples via an isoprobabilistic transformation + n_samples, n_params = X.shape + Inputs = self.InputObj + origJDist = self.JDist + poly_types = self.poly_types + + disttypes = [] + for par_i in range(n_params): + disttypes.append(Inputs.Marginals[par_i].dist_type) + + # Pass non-transformed X, if arbitrary PCE is selected. + if None in disttypes or self.input_data_given or self.apce: + return X + + cdfx = np.zeros((X.shape)) + tr_X = np.zeros((X.shape)) + + for par_i in range(n_params): + + # Extract the parameters of the original space + disttype = disttypes[par_i] + if disttype is not None: + dist = origJDist[par_i] + else: + dist = None + polytype = poly_types[par_i] + cdf = np.vectorize(lambda x: dist.cdf(x)) + + # Extract the parameters of the transformation space based on + # polyType + if polytype == 'legendre' or disttype == 'uniform': + # Generate Y_Dists based + params_Y = [-1, 1] + dist_Y = st.uniform(loc=params_Y[0], + scale=params_Y[1]-params_Y[0]) + inv_cdf = np.vectorize(lambda x: dist_Y.ppf(x)) + + elif polytype == 'hermite' or disttype == 'norm': + params_Y = [0, 1] + dist_Y = st.norm(loc=params_Y[0], scale=params_Y[1]) + inv_cdf = np.vectorize(lambda x: dist_Y.ppf(x)) + + elif polytype == 'laguerre' or disttype == 'gamma': + params_Y = [1, params[1]] + dist_Y = st.gamma(loc=params_Y[0], scale=params_Y[1]) + inv_cdf = np.vectorize(lambda x: dist_Y.ppf(x)) + + # Compute CDF_x(X) + cdfx[:, par_i] = cdf(X[:, par_i]) - return OptimalCollocationPointsBase + # Compute invCDF_y(cdfx) + tr_X[:, par_i] = inv_cdf(cdfx[:, par_i]) + + return tr_X + + # ------------------------------------------------------------------------- + def FitDist(self, y): + """ + Fits the known distributions to the data. + + Parameters + ---------- + y : array of shape (n_samples) + Data to be fitted. + + Returns + ------- + sel_dist: string + Selected distribution type from `lognorm`, `norm`, `uniform` or + `expon`. + params : list + Parameters corresponding to the selected distibution type. + + """ + dist_results = [] + params = {} + dist_names = ['lognorm', 'norm', 'uniform', 'expon'] + for dist_name in dist_names: + dist = getattr(st, dist_name) + + try: + if dist_name != 'lognorm': + param = dist.fit(y) + else: + param = dist.fit(np.exp(y), floc=0) + except: + param = dist.fit(y) + + params[dist_name] = param + # Applying the Kolmogorov-Smirnov test + D, p = st.kstest(y, dist_name, args=param) + dist_results.append((dist_name, D)) + + # select the best fitted distribution + sel_dist, D = (min(dist_results, key=lambda item: item[1])) + + if sel_dist == 'uniform': + params[sel_dist] = [params[sel_dist][0], params[sel_dist][0] + + params[sel_dist][1]] + if D < 0.05: + return sel_dist, params[sel_dist] + else: + return None, None diff --git a/src/bayesvalidrox/surrogate_models/sequential_design.py b/src/bayesvalidrox/surrogate_models/sequential_design.py index a0fcc535d5f54016af9ffed311382d8e6e42cb98..ccf6a35f265d535ad206a7a31fe3154f5d59343f 100644 --- a/src/bayesvalidrox/surrogate_models/sequential_design.py +++ b/src/bayesvalidrox/surrogate_models/sequential_design.py @@ -1090,7 +1090,7 @@ class SeqDesign(): # Initialization PCEModel = self.MetaModel - Bounds = PCEModel.BoundTuples + Bounds = PCEModel.bound_tuples n_new_samples = PCEModel.ExpDesign.n_new_samples explore_method = PCEModel.ExpDesign.explore_method exploit_method = PCEModel.ExpDesign.exploit_method @@ -1204,8 +1204,8 @@ class SeqDesign(): ax1.annotate(txt, (allCandidates[i, 0], allCandidates[i, 1])) - plt.xlim(self.BoundTuples[0]) - plt.ylim(self.BoundTuples[1]) + plt.xlim(self.bound_tuples[0]) + plt.ylim(self.bound_tuples[1]) # plt.show() plt.legend(loc='upper left') @@ -1576,7 +1576,7 @@ class SeqDesign(): os.makedirs(newpath) lw = 3. - BoundTuples = self.BoundTuples + bound_tuples = self.bound_tuples n_params = len(MAP) # This is the true mean of the second mode that we used above: @@ -1589,14 +1589,14 @@ class SeqDesign(): figPosterior, ax = plt.subplots() plt.hist2d(Posterior[:, 0], Posterior[:, 1], bins=(200, 200), - range=np.array([BoundTuples[0], BoundTuples[1]]), + range=np.array([bound_tuples[0], bound_tuples[1]]), cmap=plt.cm.jet) plt.xlabel(parNames[0]) plt.ylabel(parNames[1]) - plt.xlim(BoundTuples[0]) - plt.ylim(BoundTuples[1]) + plt.xlim(bound_tuples[0]) + plt.ylim(bound_tuples[1]) ax.axvline(value1[0], color="g", lw=lw) ax.axhline(value1[1], color="g", lw=lw) diff --git a/src/bayesvalidrox/surrogate_models/surrogate_models.py b/src/bayesvalidrox/surrogate_models/surrogate_models.py index 292dc0fde2dd3d2624363af04a36e5bb0b3f1ce4..e8620e1bb9629ef4f50f1fc3ff7001c3a7e9e9e2 100644 --- a/src/bayesvalidrox/surrogate_models/surrogate_models.py +++ b/src/bayesvalidrox/surrogate_models/surrogate_models.py @@ -301,7 +301,7 @@ class MetaModel: ExpDesign.n_init_samples = ExpDesign.X.shape[0] # Read EDX and pass it to ExpDesign object - out_names = self.ModelObj.Output.Names + out_names = self.ModelObj.Output.names ExpDesign.Y = {} # Extract x values @@ -334,7 +334,7 @@ class MetaModel: max_pce_deg=np.max(self.pce_deg)) ExpDesign.X = ED_X ExpDesign.collocationPoints = ED_X_tr - self.BoundTuples = ExpDesign.BoundTuples + self.bound_tuples = ExpDesign.bound_tuples # ---- Run simulations at X ---- if not hasattr(ExpDesign, 'Y'): @@ -378,7 +378,7 @@ class MetaModel: n_max = np.max(self.pce_deg) if n_max is None else n_max # Extract poly coeffs - if self.ExpDesign.arbitrary: + if self.ExpDesign.input_data_given or self.ExpDesign.apce: apolycoeffs = self.ExpDesign.polycoeffs else: apolycoeffs = None @@ -486,7 +486,7 @@ class MetaModel: # Bayes sparse adaptive aPCE if reg_method.lower() != 'ols': - if reg_method.lower() == 'brr' or np.all(y == 0): + if reg_method.lower() == 'brr' or np.var(y) == 0: clf_poly = lm.BayesianRidge(n_iter=1000, tol=1e-7, fit_intercept=True, normalize=True, @@ -905,28 +905,40 @@ class MetaModel: """ # Transform via Principal Component Analysis - if isinstance(self, 'varPCAThreshold'): - varPCAThreshold = self.varPCAThreshold + if hasattr(self, 'var_pca_threshold'): + var_pca_threshold = self.var_pca_threshold else: - varPCAThreshold = 100.0 + var_pca_threshold = 100.0 n_samples, n_features = Output.shape - # Instantiate and fit sklearnPCA object - covar_matrix = sklearnPCA(n_components=None) - covar_matrix.fit(Output) - var = np.cumsum(np.round(covar_matrix.explained_variance_ratio_, - decimals=5)*100) - # Find the number of components to explain self.varPCAThreshold of - # variance - try: - selected_n_components = np.where(var >= varPCAThreshold)[0][0] + 1 - except ValueError: - selected_n_components = min(n_samples, n_features) + if hasattr(self, 'n_pca_components'): + n_pca_components = self.n_pca_components + else: + # Instantiate and fit sklearnPCA object + covar_matrix = sklearnPCA(n_components=None) + covar_matrix.fit(Output) + var = np.cumsum(np.round(covar_matrix.explained_variance_ratio_, + decimals=5)*100) + # Find the number of components to explain self.varPCAThreshold of + # variance + try: + n_components = np.where(var >= var_pca_threshold)[0][0] + 1 + except IndexError: + n_components = min(n_samples, n_features) + + n_pca_components = min(n_samples, n_features, n_components) - nComponents = min(n_samples, n_features, selected_n_components) + # Print out a report + print() + print('-' * 50) + print(f"PCA transformation is performed with {n_pca_components}" + " components.") + print('-' * 50) + print() # Fit and transform with the selected number of components - pca = sklearnPCA(n_components=nComponents, svd_solver='randomized') + pca = sklearnPCA(n_components=n_pca_components, + svd_solver='randomized') OutputMatrix = pca.fit_transform(Output) return pca, OutputMatrix @@ -1092,8 +1104,7 @@ class MetaModel: if self.dim_red_method.lower() == 'pca': PCA = self.pca[ouput] - mean_pred[ouput] = PCA.mean_ - std_pred[ouput] += np.dot(mean, PCA.components_) + mean_pred[ouput] = PCA.mean_ + np.dot(mean, PCA.components_) std_pred[ouput] = np.sqrt(np.dot(std**2, PCA.components_**2)) else: mean_pred[ouput] = mean