Source code for Taweret.mix.bivariate_linear

# Author : Dananjaya Liyanage
# Email : liyanagedananjaya@gmail.com

import bilby
import numpy as np
from Taweret.utils.utils import mixture_function, eps, normed_mvn_loglike
from Taweret.core.base_mixer import BaseMixer
from Taweret.core.base_model import BaseModel
from Taweret.sampler.likelihood_wrappers import likelihood_wrapper_for_bilby

# typing imports

from typing import Dict
from typing import List
from typing import Optional
from typing import Type


[docs] class BivariateLinear(BaseMixer): ''' Local linear Bayesian mixing of two models. This is a general class of mixing that offer both density (likelihood) and mean mixing methods. The default mixing method is linear mixing of two likelihoods. ''' def __init__(self, models_dic: Dict[str, Type[BaseModel]], method: str = 'sigmoid', nargs_model_dic: Optional[Dict[str, int]] = None, same_parameters: bool = False, full_cov: bool = False, BMMcor: bool = False, mean_mix: bool = False): ''' Parameters: ----------- models_dic : dictionary {'name1' : model1, 'name2' : model2} Two models to mix, each must be derived from the base_model. method : str Mixing weight function form. This is a function of input parameters. nargs_model_dic : dictionary {'name1' : N_model1, 'name2' : N_model2} Only used in calibration. Number of free parameters in each model same_parameters : bool Only used in BMM with calibration. If set, two models are assumed to have same parameters. full_cov : bool This option is only used in BMMcor method. For BMMC full covariance is not needed and mean_mix must have full covariance. BMMcor : bool If set use BMMcor method for Bayesian model mixing. mean_mix : bool If set use mean mixing method for Bayesian model mixing. ''' if nargs_model_dic is None: nargs_model_dic = {} if not isinstance(nargs_model_dic, dict): raise AttributeError("nargs_model_dic has to be a dictionary") # check if more than two models are being mixed if len(models_dic) != 2: raise Exception('Bivariate linear mixing can mix only two models.\ Please look at the other mixing methods in \ Taweret for multi model mixing') # check if the models are derived from the base class for i, model in enumerate(list(models_dic.values())): try: # model is not a class but an object isinstance(model, BaseModel) except Exception: raise AttributeError(f'model {list(models_dic.keys())[i]} \ is not derived from \ taweret.core.base_model class') else: continue self.models_dic = models_dic # If a new method is added the following needs to be updated # It has method and number of free parameters in each method method_n_mix_dic = {'step': 1, 'addstep': 2, 'addstepasym': 3, 'sigmoid': 2, 'cdf': 2, 'switchcos': 3, 'calibrate_model_1': 0, 'calibrate_model_2': 0} # check if the mixing function exist if method not in method_n_mix_dic: raise Exception('Mixing function is not found') else: self.n_mix = method_n_mix_dic[method] # assign default priors priors = bilby.core.prior.PriorDict() for i in range(0, self.n_mix): name = f'{method}_{i}' priors[name] = bilby.core.prior.Uniform(0, 1, name=name) self.same_parameters = same_parameters self.method = method self.nargs_model_dic = nargs_model_dic # Flag to know if the model was trained or not self.model_was_trained = False self._map = None self._posterior = None # This combines model priors with mixing method priors self._prior = self.set_prior(priors) self.BMMcor = BMMcor self.mean_mix = mean_mix self.full_cov = full_cov # Attributes @property def prior(self): return self._prior @prior.setter def prior(self, bilby_prior_dic): return self.set_prior(bilby_prior_dic) @property def posterior(self): if self._posterior is None: raise Exception('First train the model to access the posterior') else: return self._posterior @property def map(self): if self._map is None: raise Exception('First train the model to access the MAP') else: return self._map # End Attributes
[docs] def evaluate(self, mixture_params: np.ndarray, x: np.ndarray, model_params: Optional[List[np.ndarray]] = [] ) -> np.ndarray: ''' Evaluate the mixed model for given parameters at input values x Parameters: ----------- mixture_params : np.1darray parameter values that fix the shape of mixing function x : np.1daray input parameter values array model_params: list[model_1_params, mode_2_params] list of model parameter values for each model Returns: -------- evaluation : np.2darray the evaluation of the mixed model at input values x Has the shape of len(x) x Number of observables in the model ''' w1, w2 = mixture_function(self.method, x, mixture_params, self.prior) model_1, model_2 = list(self.models_dic.values()) try: if self.same_parameters: try: model_1_out, _, _ = model_1.evaluate(x, model_params[0]) except BaseException: model_1_out, _ = model_1.evaluate(x, model_params[0]) else: try: model_1_out, _, _ = model_1.evaluate(x, model_params[0]) except BaseException: model_1_out, _ = model_1.evaluate(x, model_params[0]) except BaseException: try: model_1_out, _ = model_1.evaluate(x) except BaseException: model_1_out, _, _ = model_1.evaluate(x) try: if self.same_parameters: try: model_2_out, _, _ = model_2.evaluate(x, model_params[0]) except BaseException: model_2_out, _ = model_2.evaluate(x, model_params[0]) else: try: model_2_out, _, _ = model_2.evaluate(x, model_params[1]) except BaseException: model_2_out, _ = model_2.evaluate(x, model_params[1]) except BaseException: try: model_2_out, _, _ = model_2.evaluate(x) except BaseException: model_2_out, _ = model_2.evaluate(x) model_1_out = np.array(model_1_out) model_2_out = np.array(model_2_out) if model_1_out.ndim == model_2_out.ndim and model_2_out.ndim <= 1: return w1 * model_1_out + w2 * model_2_out elif model_1_out.ndim == model_2_out.ndim and model_2_out.ndim == 2: if len(model_1_out) != len(model_2_out): raise Exception( 'Dimension mismatch between outputs of models') outputs = [] for obs_n in range(0, model_1_out.shape[1]): outputs.append( w1 * model_1_out[:, obs_n] + w2 * model_2_out[:, obs_n]) return np.array(outputs) else: assert Exception(f'Dimensional mismatch: dim of model 1 is \ {model_1_out.ndim} , \ model 2 is {model_2_out.ndim}')
[docs] def evaluate_weights(self, mixture_params: np.ndarray, x: np.ndarray) -> np.ndarray: ''' return the mixing function values at the input parameter values x Parameters: ----------- mixture_params : np.1darray parameter values that fix the shape of mixing function x : np.1darray input parameter values Returns: -------- weights : list[np.1darray, np.1darray] weights for model 1 and model 2 at input values x ''' return mixture_function(self.method, x, mixture_params, self.prior)
[docs] def predict(self, x: np.ndarray, CI: List = [5, 95], samples: Optional[np.ndarray] = None, nthin: int = 1): ''' Evaluate posterior to make prediction at test points x. Parameters: ----------- x : np.1darray input parameter values CI : list confidence intervals as percentages samples: np.ndarray If samples are given use that instead of posterior\ for predictions. Returns: -------- evaluated_posterior : np.ndarray array of posterior predictive distribution evaluated at provided test points mean : np.ndarray average mixed model value at each provided test points credible_intervals : np.ndarray intervals corresponding for 60%, 90% credible intervals std_dev : np.ndarray sample standard deviation of mixed model output at provided test points ''' if self.model_was_trained is False and samples is None: raise Exception('Posterior is not available to make predictions\n\ train the model before predicting') pos_predictions = [] if samples is not None: posterior = samples else: posterior = self._posterior for sample in posterior[::nthin]: sample = np.array(sample).flatten() mixture_param = sample[0:self.n_mix] model_params = [] n_args_for_models = list(self.nargs_model_dic.values()) n_args_sum = 0 for i in range(0, len(n_args_for_models)): model_params.append( sample[self.n_mix + n_args_sum:self.n_mix + n_args_sum + n_args_for_models[i]]) n_args_sum += n_args_for_models[i] if self.same_parameters: break value = self.evaluate(mixture_param, x, model_params) pos_predictions.append(value) pos_predictions = np.array(pos_predictions) CIs = np.nanpercentile(pos_predictions, CI, axis=0, keepdims=True) mean = np.nanmean(pos_predictions, axis=0, keepdims=True) std_dev = np.nanstd(pos_predictions, axis=0, keepdims=True) return pos_predictions, mean, CIs, std_dev
[docs] def predict_weights(self, x: np.ndarray, CI: List = [5, 95], samples: Optional[np.ndarray] = None): ''' Calculate posterior predictive distribution for first model weights Parameters: ----------- x : np.1darray input parameter values CI : list confidence intervals samples: np.ndarray If samples are given use that instead of posterior\ for predictions. Returns: -------- posterior_weights : np.ndarray array of posterior predictive distribution of weights mean : np.ndarray average mixed model value at each provided test points credible_intervals : np.ndarray intervals corresponding for 60%, 90% credible intervals std_dev : np.ndarray sample standard deviation of mixed model output at provided test points ''' if self.model_was_trained is False and samples is None: raise Exception('Posterior is not available to make predictions\n\ train the model before predicting') pos_predictions = [] if samples is not None: posterior = samples else: posterior = self._posterior for sample in posterior: sample = np.array(sample).flatten() mixture_param = sample[0:self.n_mix] value, _ = self.evaluate_weights(mixture_param, x) pos_predictions.append(value) pos_predictions = np.array(pos_predictions) CIs = np.percentile(pos_predictions, CI, axis=0) mean = np.mean(pos_predictions, axis=0) std_dev = np.std(pos_predictions, axis=0) return pos_predictions, mean, CIs, std_dev
[docs] def prior_predict(self, x: np.ndarray, CI: List = [5, 95], n_sample: int = 10000): ''' Evaluate prior to make prediction at test points x. Parameters: ----------- x : np.1darray input parameter values CI : list confidence intervals n_samples : int number of samples to evaluate prior_prediction Returns: -------- evaluated_prior : np.ndarray array of prior predictive distribution evaluated at provided test points mean : np.ndarray average mixed model value at each provided test points credible_intervals : np.ndarray intervals corresponding for 60%, 90% credible intervals std_dev : np.ndarray sample standard deviation of mixed model output at provided test points ''' if self._prior is None: raise Exception("Define the prior first using set_prior") else: samples = np.array(list(self._prior.sample(n_sample).values())).T return self.predict(x, CI=CI, samples=samples)
[docs] def set_prior(self, bilby_prior_dic): ''' Set prior for the mixing function parameters. Prior for the model parameters should be defined in each model. Parameters ---------- bilby_prior_dic : bilby.core.prior.PriorDict The keys should be named as following : '<mix_func_name>_1', '<mix_func_name>_2', ... Returns ------- A full Bilby prior object for the mixed model. Including the mixing function parameters and model parameters. The Bilby prior dictionary has following keys. Prior for mixture function parameter : '<mix_func_name>_1', '<mix_func_name>_2', ... Prior parameters for model 1 : '<name_of_the_model>_1', '<name_of_the_model>_2' , ... Prior parameters for model 2 : '<name_of_the_model>_1', '<name_of_the_model>_2' , ... ''' for name, model in self.models_dic.items(): if model.prior is None: continue else: priors = model.prior for ii, entry2 in enumerate(priors): bilby_prior_dic[f'{name}_{ii}'] = priors[entry2] if self.same_parameters: break self._prior = bilby_prior_dic return self._prior
[docs] def mix_loglikelihood(self, mixture_params: np.ndarray, model_param: np.ndarray, x_exp: np.ndarray, y_exp: np.ndarray, y_err: np.ndarray) -> float: """ log likelihood of the mixed model given the mixing function parameters Parameters: ----------- mixture_params : np.1darray parameter values that fix the shape of mixing function model_params: list[model_1_params, mode_2_params] list of model parameter values for each model x_exp: np.1darray Experimentally measured input values y_exp: np.2darray Experimentally measured observable values. Takes the shape len(x_exp) x number of observable types measured y_err: np.2darray Experimentally measured observable errors. Takes the shape len(x_exp) x number of observable types measured """ if len(model_param) == 0: model_1_param = np.array([]) model_2_param = np.array([]) else: if self.same_parameters: model_1_param = model_param[0] model_2_param = model_param[0] else: model_1_param, model_2_param = model_param W_1, W_2 = self.evaluate_weights(mixture_params, x_exp) model_1, model_2 = list(self.models_dic.values()) if self.BMMcor and not self.mean_mix: models_ar = [model_1, model_2] weight_ar = [W_1, W_2] model_param_ar = [model_1_param, model_2_param] L_ar = [] for i in range(0, 2): model = models_ar[i] model_param = model_param_ar[i] predictions, model_errs, cov_mat = model.evaluate( x_exp, model_param, full_corr=self.full_cov) x_exp = x_exp.flatten() y_exp_all = y_exp if len(x_exp) != y_exp_all.shape[0]: raise Exception( 'Dimensionality mismatch between x_exp and y_exp') mask_y_exp = np.isfinite(y_exp_all) mask_flat = mask_y_exp.flatten() weights = [] W = weight_ar[i] for w in W: ww_array = w * np.ones(y_exp_all.shape[1]) weights.append(ww_array) weights = np.array(weights).flatten()[mask_flat] predictions = np.array(predictions).flatten()[mask_flat] y_exp_all = np.array(y_exp_all).flatten()[mask_flat] y_err_all = np.array(y_err).flatten()[mask_flat] diff = (predictions - y_exp_all) * weights final_cov = np.diag(np.square(y_err_all)) if cov_mat is not None: final_cov += cov_mat L_ar.append(normed_mvn_loglike(diff, final_cov)) L1, L2 = L_ar return L1 + L2 if self.mean_mix and not self.BMMcor: predictions_1, model_errs_1, cov_mat_1 = model_1.evaluate( x_exp, model_1_param, full_corr=True) predictions_2, model_errs_2, cov_mat_2 = model_2.evaluate( x_exp, model_2_param, full_corr=True) weights = [] for w in W_1: weights.append(w * np.ones(y_exp.shape[1])) weights = np.array(weights).flatten() predictions_1 = np.array(predictions_1).flatten() predictions_2 = np.array(predictions_2).flatten() y_exp_all = np.array(y_exp).flatten() y_err_all = np.array(y_err).flatten() mix_prediction = predictions_1 * \ weights + predictions_2 * (1 - weights) diff = y_exp_all - mix_prediction # A better optimization yield 5 times the speed. # For testing of the method loo at tests.ipynb in notebooks # folder. # N = len(y_exp_all) # final_cov = np.zeros((N,N)) # for i in range(0,N): # for j in range(0,N): # final_cov[i,j] = weights[i]*weights[j]*cov_mat_1[i,j] # + (1-weights[i])*(1-weights[j])*cov_mat_2[i,j] # Comment out the below code. BMM mean mixing does not touch the # covariance stucture....yet... # w1_mat = np.outer(weights, weights) # w2_mat = np.outer(1-weights,1-weights) # final_cov = w1_mat * cov_mat_1 + w2_mat * cov_mat_2 if cov_mat_1 is not None and cov_mat_2 is not None: final_cov = cov_mat_1 + cov_mat_2 final_cov += np.diag(np.square(y_err_all)) else: final_cov = np.diag(np.square(y_err_all)) return normed_mvn_loglike(diff, final_cov) else: W_1 = np.log(W_1 + eps) W_2 = np.log(W_2 + eps) if self.method == 'calibrate_model_2': L1 = np.zeros(len(x_exp)) L2 = model_2.log_likelihood_elementwise( x_exp, y_exp, y_err, model_2_param) elif self.method == 'calibrate_model_1': L2 = np.zeros(len(x_exp)) L1 = model_1.log_likelihood_elementwise( x_exp, y_exp, y_err, model_1_param) else: L1 = model_1.log_likelihood_elementwise( x_exp, y_exp, y_err, model_1_param) L2 = model_2.log_likelihood_elementwise( x_exp, y_exp, y_err, model_2_param) # L1 = log_likelihood_elementwise(self.models_dic.items()[0], # self.x_exp, self.y_exp, \ # self.y_err, model_1_param) # L2 = log_likelihood_elementwise(self.models_dic.items()[1], # self.x_exp, self.y_exp, \ # self.y_err, model_2_param) # we use the logaddexp here for numerical accuracy. Look at the # mix_loglikelihood_test to check for an alternative (common) way mixed_loglikelihood_elementwise = np.logaddexp(W_1 + L1, W_2 + L2) return np.sum(mixed_loglikelihood_elementwise).item()
[docs] def train(self, x_exp: np.ndarray, y_exp: np.ndarray, y_err: np.ndarray, label: str = 'bivariate_mix', outdir: str = 'outdir', kwargs_for_sampler: Optional[Dict[str, int]] = None, load_previous: bool = False, plot: bool = False, ): ''' Run sampler to learn parameters. Method should also create class members that store the posterior and other diagnostic quantities important for plotting MAP values, and finds the MAP values for each parameter, and sets them equal to a class variable for easy access. Parameters: ----------- x_exp: np.1darray Experimentally measured input values y_exp: np.2darray Experimentally measured observable values. Takes the shape len(x_exp) x number of observable types measured y_err: np.2darray Experimentally measured observable errors. Takes the shape len(x_exp) x number of observable types measured label: str Name of the chain to be stored after sampling outdir: str Where to save the MCMC chain and output of bilby samplers kwargs_for_sampler: Dict Optional arguments to be used instead of default Bibly sampler settings load_previous: bool If a previous training has been done, load that chain instead of retraining. Returns: -------- result : bilby posterior object object returned by the bilby sampler ''' import platform if platform.system() == 'Darwin': if 'threads' in kwargs_for_sampler.keys( ) and kwargs_for_sampler['threads'] > 1: import warnings import multiprocessing warnings.warn("'threads' detected in `kwargs` on Darwin." + " Setting `start_method` to `fork`") try: multiprocessing.set_start_method('fork', force=True) except RuntimeError: pass prior = self._prior if prior is None: raise Exception("Please define the priors before training") if platform.system() == "Darwin": if "threads" in kwargs_for_sampler.keys( ) and kwargs_for_sampler['threads'] > 1: import warnings import multiprocessing warnings.warn("'threads' dectected in 'kwargs_for_sampler'" + " on Darwin. Setting `start_method` to `fork`") try: multiprocessing.set_start_method('fork', force=True) except RuntimeError: pass # A few simple setup steps likelihood = likelihood_wrapper_for_bilby(self, x_exp, y_exp, y_err) # if os.path.exists(outdir) and load_previous: try: result = bilby.result.read_in_result(outdir=outdir, label=label) except Exception: # if os.path.exists(outdir+'/'+label): # shutil.rmtree(outdir+'/'+label) if kwargs_for_sampler is None: kwargs_for_sampler = {'sampler': 'ptemcee', 'ntemps': 10, 'nwalkers': 200, 'Tmax': 100, 'burn_in_fixed_discard': 5000, 'nsamples': 20000, 'threads': 28, 'printdt': 60} # 'safety':2, # 'autocorr_tol':5} result = bilby.run_sampler( likelihood, prior, label=label, outdir=outdir, **kwargs_for_sampler, plot=plot) # The last two columns are model liklihood and log_prior. self._posterior = result.posterior.values[:, 0:-2] self.model_was_trained = True # Shorcut to find MAP. Need to implement a proper optimization # routine to find MAP # currently MAP finding has to be done outside the bivariate # mixing script because it can be computationally heavy. # Users are given access to the log likelihood and they can # optimize it Outside of bivaraiate linear mixing script. self._map = self._posterior[np.argmax( result.posterior.values[:, -2].flatten()), :] return result