from scipy.linalg import lapack
# from logging import raiseExceptions
import numpy as np
from scipy.special import expit
from scipy.stats import norm, beta, dirichlet
# define log likelihood to be calculated give a model with a predict function
# and experimental measurments.
# quick fix untill I find a permentant solution to put normed likelihood
# calculation code here
# Path to Jetscape model source code
import sys
sys.path.append(
"/Users/dananjayaliyanage/git/Taweret/subpackages/js-sims-bayes/src")
# Imports from Jetscape code. Need to load the saved emulators.
# from configurations import *
# from emulator import *
# from bayes_mcmc import normed_mvn_loglike
eps = 1e-15
[docs]
def normed_mvn_loglike(y, cov):
r"""
Evaluate the multivariate-normal log-likelihood for difference vector `y`
and covariance matrix `cov`:
.. math::
log_p = -\frac{1}{2}[y^T C^{-1} y + \mathrm{log}(\mathrm{det}(C))]
+ const.
This likelihood IS NORMALIZED.
The normalization const :math:`= -\frac{n}{2}\mathrm{log}(2\pi)`,
where :math:`n` is the dimensionality.
Arguments `y` and `cov` MUST be np.arrays with dtype == float64 and shapes
(n) and (n, n), respectively. These requirements are NOT CHECKED.
The calculation follows algorithm 2.1 in Rasmussen and Williams (Gaussian
Processes for Machine Learning).
"""
# Compute the Cholesky decomposition of the covariance.
# Use bare LAPACK function to avoid scipy.linalg wrapper overhead.
L, info = lapack.dpotrf(cov, clean=False)
if info < 0:
raise ValueError(
'lapack dpotrf error: '
'the {}-th argument had an illegal value'.format(-info)
)
elif info < 0:
raise np.linalg.LinAlgError(
'lapack dpotrf error: '
'the leading minor of order {} is not positive definite'
.format(info)
)
# Solve for alpha = cov^-1.y using the Cholesky decomp.
alpha, info = lapack.dpotrs(L, y)
if info != 0:
raise ValueError(
'lapack dpotrs error: '
'the {}-th argument had an illegal value'.format(-info)
)
n = len(y)
norm_const = -n / (2. * np.log(2. * np.pi))
# return -.5*np.dot(y, alpha) - np.log(eps+L.diagonal()).sum() + \
# norm_const
return -.5 * np.dot(y, alpha) - np.log(L.diagonal()).sum() + norm_const
[docs]
def normal_log_likelihood_elementwise(
model: object,
x_exp: np.ndarray,
y_exp: np.ndarray,
y_err: np.ndarray,
model_param=np.array(
[])) -> np.ndarray:
"""
predict the log normal log liklihood for each experimental data point
Parametrs
---------
model : object
model object with a predict method
x_exp : np.1darray
input parameter values for experimental data
y_exp : np.1darray
mean of the experimental data
y_err : np.1darray
standard deviation of the experimental data
"""
if model_param.size == 0:
try:
predictions, model_err = model.evaluate(x_exp)
except BaseException:
predictions, model_err, _ = model.evaluate(x_exp)
else:
try:
predictions, model_err = model.evaluate(x_exp, model_param)
except BaseException:
predictions, model_err, _ = model.evaluate(x_exp, model_param)
sigma = np.sqrt(np.square(y_err) + np.square(model_err))
diff = -0.5 * np.square((predictions.flatten() - y_exp) / sigma) \
- 0.5 * np.log(2 * np.pi) - np.log(sigma)
return diff
[docs]
def normal_likelihood_elementwise(
model: object,
x_exp: np.ndarray,
y_exp: np.ndarray,
y_err: np.ndarray,
model_param=np.array(
[])) -> np.ndarray:
"""
predict the normal liklihood for each experimental data point
Parametrs
---------
model : object
model object with a predict method
x_exp : np.1darray
input parameter values for experimental data
y_exp : np.1darray
mean of the experimental data
y_err : np.1darray
standard deviation of the experimental data
"""
if model_param.size == 0:
try:
predictions, model_err = model.evaluate(x_exp)
except BaseException:
predictions, model_err, _ = model.evaluate(x_exp)
else:
try:
predictions, model_err = model.evaluate(x_exp, model_param)
except BaseException:
predictions, model_err, _ = model.evaluate(x_exp, model_param)
sigma = np.sqrt(np.square(y_err) + np.square(model_err))
pre_factor = np.sqrt(2.0 * np.pi) * sigma
diff = -0.5 * np.square((predictions.flatten() - y_exp) / sigma)
return np.exp(diff) / pre_factor
[docs]
def mixture_function(
method: str,
x: np.ndarray,
mixture_params: np.ndarray,
prior=None) -> np.ndarray:
"""
predict the weights from the mixture funtion at the give input
parameter values x
Parameters
----------
method : str
name of the linear mixing function method
x : np.1darray
input parameter values
mixture_params : np.1darray
parametrs that decide the shape of mixture function
prior : (optional) bilby prior object
Used only in step mixing
to deal with negative values of the input.
"""
# if mixture_params.size == 0:
# return np.ones(len(x))
if method == 'sigmoid':
theta_0, theta_1 = mixture_params
w = expit((x - theta_0) / theta_1)
return w, 1 - w
elif method == 'step':
x_0 = mixture_params[0]
if x_0 >= 0:
# If x is less than x_0 it's 1. Otherwise 0.
w = np.array(
[1 - (eps) if xi <= x_0 else eps for xi in x]).flatten()
elif x_0 < 0:
# x_0 = -1*x_0
max_num = prior['step_0'].maximum
bound = max_num + x_0
# If x is greater than max_num - |x_0| it's 1. Otherwise 0.
w = np.array(
[1 - (eps) if xi >= bound else eps for xi in x]).flatten()
return w, 1 - w
elif method == 'addstep':
x_0, alpha = mixture_params
if x_0 >= 0:
# If x is less than x_0 it's 1. Otherwise 0.
w1 = np.array(
[1 - (eps) if xi <= x_0 else eps for xi in x]).flatten()
max_num = prior['addstep_0'].maximum
bound = max_num - x_0
# If x is greater than max_num - |x_0| it's 1. Otherwise 0.
w2 = np.array(
[1 - (eps) if xi >= bound else eps for xi in x]).flatten()
else:
raise Exception(f'x_0 has to be non negative but provided {x_0}')
w = alpha * w1 + (1 - alpha) * w2
return w, 1 - w
elif method == 'addstepasym':
x_0, x_1, alpha = mixture_params
if x_0 >= 0:
# If x is less than x_0 it's 1. Otherwise 0.
w1 = np.array(
[1 - (eps) if xi <= x_0 else eps for xi in x]).flatten()
# max_num = prior['addstepasym_0'].maximum
# bound = max_num - x_0
# If x is greater than max_num - |x_0| it's 1. Otherwise 0.
# w2 = np.array([1-(eps) if xi>=bound else eps for \
# xi in x]).flatten()
else:
raise Exception(f'x_0 has to be non negative but provided {x_0}')
if x_1 >= 0:
# If x is less than x_0 it's 1. Otherwise 0.
w2 = np.array(
[1 - (eps) if xi >= x_1 else eps for xi in x]).flatten()
# max_num = prior['addstep_0'].maximum
# bound = max_num - x_0
# If x is greater than max_num - |x_0| it's 1. Otherwise 0.
# w2 = np.array([1-(eps) if xi>=bound else eps \
# for xi in x]).flatten()
else:
raise Exception(f'x_1 has to be non negative but provided {x_1}')
w = alpha * w1 + (1 - alpha) * w2
return w, 1 - w
elif method == 'cdf':
theta_0, theta_1 = mixture_params
x = theta_0 + theta_1 * x
w = norm.cdf(x)
w = np.array(w).flatten()
return w, 1 - w
elif method == 'beta':
w = beta.rvs(*mixture_params)
return w, 1 - w
elif method == 'dirchlet':
w = dirichlet.rvs(mixture_params)
return w
elif method == 'switchcos':
g1, g2, g3 = mixture_params
w = np.array(list(map(lambda x: switchcos(g1, g2, g3, x), x)))
return w, 1 - w
elif method == 'calibrate_model_1':
w = np.ones(len(x))
return w, 1 - w
elif method == 'calibrate_model_2':
w = np.ones(len(x))
return 1 - w, w
else:
raise Exception('Method is not available for `mixture function`')
[docs]
def switchcos(g1, g2, g3, x):
"""Switchcos function in Alexandra's Samba module
link https://github.com/asemposki/SAMBA/
Parameters:
-----------
g1 : float
switching value from left constant to first cosine
g2 : float
switching value from second cosine to right constant
g3 : float
switching value from first cosine to second cosine
x : float
the input parameter value to calculate the weight
"""
if g1 > g2 or g2 < g3 or g1 > g3:
# return -np.inf
# let's throw an error instead.
raise Exception(f'g1 > g3 > g2 but given g1:{g1} g2:{g2} g3:{g3}')
if x <= g1:
return 1.0
elif x <= g3:
return (1.0 + np.cos((np.pi / 2.0) * ((x - g1) / (g3 - g1)))) / 2.0
elif x < g2:
return 0.5 + np.cos((np.pi / 2.0) *
(1.0 + ((x - g3) / (g2 - g3)))) / 2.0
else:
return 0.0