"""
Trunchen version of noisemodel
Goal is to compute the full n-band covariance matrix for each model
"""
import numpy as np
from scipy.spatial import cKDTree
from tqdm import tqdm
from beast.observationmodel.noisemodel.noisemodel import NoiseModel
from beast.observationmodel.vega import Vega
__all__ = ["MultiFilterASTs"]
[docs]
class MultiFilterASTs(NoiseModel):
""" Implement a noise model where the ASTs are provided as a single table
Attributes
----------
astfile: str
file containing the ASTs
filters: sequence(str)
sequence of filter names
"""
def __init__(self, astfile, filters, *args, **kwargs):
NoiseModel.__init__(self, astfile, *args, **kwargs)
self.setFilters(filters)
# needs updating
self._input_fluxes = None
self._biases = None
self._completenesses = None
self._cov_matrices = None
self._corr_matrices = None
[docs]
def setFilters(self, filters):
""" set the filters and update the vega reference for the conversions
Parameters
----------
filters: sequence
list of filters using the internally normalized namings
"""
self.filters = filters
# ASTs inputs are in vega mag whereas models are in flux units
# for optimization purpose: pre-compute
with Vega() as v:
_, vega_flux, _ = v.getFlux(filters)
self.vega_flux = vega_flux
def _calc_ast_cov(self, indxs, filters, return_all=False):
"""
The NxN-dimensional covariance matrix and N-dimensional bias vector are
calculated from M independent ASTs computed for N bands
Parameters
----------
indxs : index array giving the ASTs assocaited with a single
model SED
filters : base filter names in the AST file
Keywords
--------
return_all : True/False
Returns
-------
if return_all = False
(cov_mat, bias, compls)
else
(cov_mat, bias, stddevs, corr_mat, diffs, ifluxes, compls)
cov_mat : NxN dim numpy array
covariance matrix in flux units
bias : N dim numpy vector
vector of the biases in each filter
stddevs : N dim numpy vector
vector of standard deviations in each filter
corr_mat : NxN dim numpy array
correlation matrix
diffs : KxN dim numpy vector
raw flux differences for N filters and K AST instances
ifluxes : N dim numpy vector
input fluxes of the AST in each filter
compl : float
AST completeness for this model
"""
# set the asts for this star using the input index array
asts = self.data[indxs]
# now check that the source was recovered in at least 1 band
# this replicates how the observed catalog is created
n_asts = len(asts)
gtindxs = np.full((n_asts), 1)
for k in range(n_asts):
cgood = 0
for cfilter in filters:
if asts[cfilter + "_VEGA"][k] < 90:
cgood = cgood + 1
gtindxs[k] = cgood
(indxs,) = np.where(gtindxs > 0)
n_indxs = len(indxs)
if n_indxs <= 5:
return False
# completeness
compl = float(n_indxs) / float(n_asts)
# setup the variables for output
n_filters = len(filters)
ifluxes = np.zeros((n_filters), dtype=np.float32)
diffs = np.zeros((n_filters, n_indxs), dtype=np.float32)
biases = np.zeros((n_filters), dtype=np.float32)
cov_matrix = np.full((n_filters, n_filters), 0.0, dtype=np.float32)
for ck, cfilter in enumerate(filters):
ifluxes[ck] = (
np.power(10.0, -0.4 * asts[cfilter + "_IN"][indxs[0]])
* self.vega_flux[ck]
)
# compute the difference vector between the input and output fluxes
# note that the input fluxes are in magnitudes and the
# output fluxes in normalized vega fluxes
diffs[ck, :] = (
asts[cfilter + "_RATE"][indxs] * self.vega_flux[ck] - ifluxes[ck]
)
# compute the bias and standard deviations around said bias
biases[ck] = np.mean(diffs[ck, :])
# compute the covariance matrix
for ck in range(n_filters):
for dk in range(ck, n_filters):
for ci in range(n_indxs):
cov_matrix[ck, dk] += (diffs[ck, ci] - biases[ck]) * (
diffs[dk, ci] - biases[dk]
)
# fill in the symmetric terms
cov_matrix[dk, ck] = cov_matrix[ck, dk]
cov_matrix /= n_indxs - 1
stddevs = np.sqrt(np.diagonal(cov_matrix))
# compute the corrleation matrix
corr_matrix = np.array(cov_matrix)
for ck in range(n_filters):
for dk in range(ck, n_filters):
if stddevs[ck] * stddevs[dk] > 0:
corr_matrix[ck, dk] /= stddevs[ck] * stddevs[dk]
else:
corr_matrix[ck, dk] = 0.0
# fill in the symmetric terms
corr_matrix[dk, ck] = corr_matrix[ck, dk]
if return_all:
return (cov_matrix, biases, stddevs, corr_matrix, diffs, ifluxes, compl)
else:
return (cov_matrix, biases, compl)
def _calc_all_ast_cov(self, filters, progress=True):
"""
The covariance matrices and biases are calculated for all the
independent models in the AST file
Parameters
----------
filters : filter names for the AST data
Keywords
--------
progress: bool, optional
if set, display a progress bar
Returns
-------
(cov_mats, biases, completenesses, corr_mats, ifluxes)
cov_mats : KxNxN dim numpy array
K AST covariance matrices in flux units
bias : KxN dim numpy vector
K vectors of the biases in each filter
completenesses : K dim numpy vector
completeness versus model
corr_mats : KxNxN dim numpy array
K AST correlation matrices
ifluxes : KxN dim numpy vector
K vectors of the input fluxes in each filter
"""
# find the stars by using unique values of the magnitude values
# in filtername
filtername = filters[-1] + "_IN"
uvals, ucounts = np.unique(self.data[filtername], return_counts=True)
n_models = len(uvals)
# setup the output
n_filters = len(filters)
all_covs = np.zeros((n_models, n_filters, n_filters), dtype=np.float64)
all_corrs = np.zeros((n_models, n_filters, n_filters), dtype=np.float32)
all_biases = np.zeros((n_models, n_filters), dtype=np.float64)
all_ifluxes = np.zeros((n_models, n_filters), dtype=np.float32)
all_compls = np.zeros((n_models), dtype=np.float32)
ast_minmax = np.zeros((2, n_filters), dtype=np.float64)
ast_minmax[0, :] = 1e99
ast_minmax[1, :] = 1e-99
# loop over the unique set of models and
# calculate the covariance matrix using the ASTs for this model
good_asts = np.full((n_models), True)
if progress is True:
it = tqdm(list(range(n_models)), desc="Calculating AST covariance matrices")
else:
it = list(range(n_models))
for i in it:
# find all the ASTs for this model
(indxs,) = np.where(self.data[filtername] == uvals[i])
n_asts = len(indxs)
if n_asts > 5:
results = self._calc_ast_cov(indxs, filters, return_all=True)
if results:
all_covs[i, :, :] = results[0]
all_biases[i, :] = results[1]
all_corrs[i, :, :] = results[3]
all_ifluxes[i, :] = results[5]
all_compls[i] = results[6]
for k in range(n_filters):
ast_minmax[0, k] = min(ast_minmax[0, k], all_ifluxes[i, k])
ast_minmax[1, k] = max(ast_minmax[1, k], all_ifluxes[i, k])
else:
good_asts[i] = False
(indxs,) = np.where(good_asts)
return (
all_covs[indxs, :, :],
all_biases[indxs, :],
all_compls[indxs],
all_corrs[indxs, :, :],
all_ifluxes[indxs, :],
ast_minmax,
)
[docs]
def process_asts(self, filters):
"""
Process all the AST results creating average biases and
covariance matrices for each model SED.
Also, prep for the interpolation by setting up the kd-tree
Parameters
----------
filters : filter names for the AST data
Returns
-------
N/A.
"""
results = self._calc_all_ast_cov(filters)
self._cov_matrices = results[0]
self._biases = results[1]
self._completenesses = results[2]
self._corr_matrices = results[3]
self._input_fluxes = results[4]
self._minmax_asts = results[5]
print("building kd-tree...")
self._kdtree = cKDTree(np.log10(self._input_fluxes))
print("...done")
[docs]
def __call__(self, sedgrid, generic_absflux_a_matrix=None, progress=True):
"""
Interpolate the results of the ASTs on the model grid
Parameters
----------
sedgrid: beast.core.grid type
model grid to interpolate AST results on
Returns
-------
progress: bool, optional
if set, display a progress bar
"""
flux = sedgrid.seds
if generic_absflux_a_matrix is not None:
model_absflux_cov = False
print("using model independent absflux cov matrix")
elif (sedgrid.cov_diag is not None) & (sedgrid.cov_offdiag is not None):
model_absflux_cov = True
absflux_cov_diag = sedgrid.cov_diag
absflux_cov_offdiag = sedgrid.cov_offdiag
print("using model dependent absflux cov matrix")
else:
model_absflux_cov = False
n_models, n_filters = flux.shape
n_offdiag = ((n_filters ** 2) - n_filters) / 2
if n_filters != len(self.filters):
raise AttributeError(
"the grid of models does not seem to"
+ "be defined with the same number of filters"
)
biases = np.zeros((n_models, n_filters), dtype=np.float64)
sigmas = np.zeros((n_models, n_filters), dtype=np.float64)
cov_diag = np.zeros((n_models, n_filters), dtype=np.float64)
cov_offdiag = np.zeros((n_models, n_offdiag), dtype=np.float64)
icov_diag = np.zeros((n_models, n_filters), dtype=np.float64)
icov_offdiag = np.zeros((n_models, n_offdiag), dtype=np.float64)
q_norm = np.zeros((n_models), dtype=np.float64)
compls = np.zeros((n_models), dtype=float)
if progress is True:
it = tqdm(list(range(n_models)), desc="Evaluating model")
else:
it = list(range(n_models))
for i in it:
# AST results are in vega fluxes
cur_flux = flux[i, :]
# find the 10 nearest neighbors to the model SED
result = self._kdtree.query(np.log10(cur_flux), 10)
dist = result[0]
indxs = result[1]
# check if the distance is very small, set to a reasonable value
(tindxs,) = np.where(dist < 0.01)
if len(tindxs) > 0:
dist[tindxs] = 0.01
# compute the interpolated covariance matrix
# use the distances to generate weights for the sum
dist_weights = 1.0 / dist
dist_weights /= np.sum(dist_weights)
cur_cov_matrix = np.average(
self._cov_matrices[indxs, :, :], axis=0, weights=dist_weights
)
# add in the absflux covariance matrix
# unpack off diagonal terms the same way they were packed
if model_absflux_cov:
m = 0
cur_cov_matrix[n_filters - 1, n_filters - 1] += absflux_cov_diag[
i, n_filters - 1
]
for k in range(n_filters - 1):
cur_cov_matrix[k, k] += absflux_cov_diag[i, k]
for ll in range(k + 1, n_filters):
cur_cov_matrix[k, ll] += absflux_cov_offdiag[i, m]
cur_cov_matrix[ll, k] += absflux_cov_offdiag[i, m]
m += 1
elif generic_absflux_a_matrix is not None:
for k in range(n_filters):
for ll in range(n_filters):
cur_cov_matrix[k, ll] += (
generic_absflux_a_matrix[k, ll] * cur_flux[k] * cur_flux[ll]
)
# compute the interpolated biases
biases[i, :] = np.average(
self._biases[indxs, :], axis=0, weights=dist_weights
)
# compute the interpolated completeness
compls[i] = np.average(self._completenesses[indxs], weights=dist_weights)
# save the straight uncertainties
sigmas[i, :] = np.sqrt(np.diagonal(cur_cov_matrix))
# invert covariance matrix
inv_cur_cov_matrix = np.linalg.inv(cur_cov_matrix)
# save the diagnonal and packed version of non-diagonal terms
m = 0
icov_diag[i, n_filters - 1] = inv_cur_cov_matrix[
n_filters - 1, n_filters - 1
]
cov_diag[i, n_filters - 1] = cur_cov_matrix[n_filters - 1, n_filters - 1]
for k in range(n_filters - 1):
icov_diag[i, k] = inv_cur_cov_matrix[k, k]
cov_diag[i, k] = cur_cov_matrix[k, k]
for ll in range(k + 1, n_filters):
icov_offdiag[i, m] = inv_cur_cov_matrix[k, ll]
cov_offdiag[i, m] = cur_cov_matrix[k, ll]
m += 1
# save the log of the determinat for normalization
# the ln(det) is calculated and saved as this is what will
# be used in the actual calculation
# norm = 1.0/sqrt(Q)
det = np.linalg.slogdet(cur_cov_matrix)
if det[0] <= 0:
print("something bad happened")
print("determinant of covarinace matrix is zero or negative")
print(det)
q_norm[i] = -0.5 * det[1]
return (
biases,
sigmas,
compls,
q_norm,
icov_diag,
icov_offdiag,
cov_diag,
cov_offdiag,
)