Source code for beast.observationmodel.noisemodel.trunchen

"""
Trunchen version of noisemodel
Goal is to compute the full n-band covariance matrix for each model
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import numpy as np

from scipy.spatial import cKDTree

from .noisemodel import NoiseModel
from ..vega import Vega

from ...tools.pbar import Pbar

__all__ = ['MultiFilterASTs']


[docs]class MultiFilterASTs(NoiseModel): """ Implement a noise model where the ASTs are provided as a single table Attributes ---------- astfile: str file containing the ASTs filters: sequence(str) sequence of filter names """ def __init__(self, astfile, filters, *args, **kwargs): NoiseModel.__init__(self, astfile, *args, **kwargs) self.setFilters(filters) # needs updating self._input_fluxes = None self._biases = None self._completenesses = None self._cov_matrices = None self._corr_matrices = None
[docs] def setFilters(self, filters): """ set the filters and update the vega reference for the conversions Parameters ---------- filters: sequence list of filters using the internally normalized namings """ self.filters = filters # ASTs inputs are in vega mag whereas models are in flux units # for optimization purpose: pre-compute with Vega() as v: _, vega_flux, _ = v.getFlux(filters) self.vega_flux = vega_flux
def _calc_ast_cov(self, indxs, filters, return_all=False): """ The NxN-dimensional covariance matrix and N-dimensional bias vector are calculated from M independent ASTs computed for N bands Parameters ---------- indxs : index array giving the ASTs assocaited with a single model SED filters : base filter names in the AST file Keywords -------- return_all : True/False Returns ------- if return_all = False (cov_mat, bias, compls) else (cov_mat, bias, stddevs, corr_mat, diffs, ifluxes, compls) cov_mat : NxN dim numpy array covariance matrix in flux units bias : N dim numpy vector vector of the biases in each filter stddevs : N dim numpy vector vector of standard deviations in each filter corr_mat : NxN dim numpy array correlation matrix diffs : KxN dim numpy vector raw flux differences for N filters and K AST instances ifluxes : N dim numpy vector input fluxes of the AST in each filter compl : float AST completeness for this model """ # set the asts for this star using the input index array asts = self.data[indxs] # now check that the source was recovered in at least 1 band # this replicates how the observed catalog is created n_asts = len(asts) gtindxs = np.full((n_asts), 1) for k in range(n_asts): cgood = 0 for cfilter in filters: if asts[cfilter+'_VEGA'][k] < 90: cgood = cgood + 1 gtindxs[k] = cgood indxs, = np.where(gtindxs > 0) n_indxs = len(indxs) if n_indxs <= 5: return False # completeness compl = float(n_indxs)/float(n_asts) # setup the variables for output n_filters = len(filters) ifluxes = np.empty((n_filters), dtype=np.float32) diffs = np.empty((n_filters, n_indxs), dtype=np.float32) biases = np.empty((n_filters), dtype=np.float32) cov_matrix = np.full((n_filters, n_filters), 0.0, dtype=np.float32) for ck, cfilter in enumerate(filters): ifluxes[ck] = (np.power(10.0, -0.4*asts[cfilter+'_IN'][indxs[0]]) * self.vega_flux[ck]) # compute the difference vector between the input and output fluxes # note that the input fluxes are in magnitudes and the # output fluxes in normalized vega fluxes diffs[ck, :] = (asts[cfilter+'_RATE'][indxs]*self.vega_flux[ck] - ifluxes[ck]) # compute the bias and standard deviations around said bias biases[ck] = np.mean(diffs[ck, :]) # compute the covariance matrix for ck in range(n_filters): for dk in range(ck, n_filters): for ci in range(n_indxs): cov_matrix[ck, dk] += ((diffs[ck, ci] - biases[ck]) * (diffs[dk, ci] - biases[dk])) # fill in the symmetric terms cov_matrix[dk, ck] = cov_matrix[ck, dk] cov_matrix /= (n_indxs - 1) stddevs = np.sqrt(np.diagonal(cov_matrix)) # compute the corrleation matrix corr_matrix = np.array(cov_matrix) for ck in range(n_filters): for dk in range(ck, n_filters): if stddevs[ck]*stddevs[dk] > 0: corr_matrix[ck, dk] /= stddevs[ck]*stddevs[dk] else: corr_matrix[ck, dk] = 0.0 # fill in the symmetric terms corr_matrix[dk, ck] = corr_matrix[ck, dk] if return_all: return (cov_matrix, biases, stddevs, corr_matrix, diffs, ifluxes, compl) else: return (cov_matrix, biases, compl) def _calc_all_ast_cov(self, filters, progress=True): """ The covariance matrices and biases are calculated for all the independent models in the AST file Parameters ---------- filters : filter names for the AST data Keywords -------- progress: bool, optional if set, display a progress bar Returns ------- (cov_mats, biases, completenesses, corr_mats, ifluxes) cov_mats : KxNxN dim numpy array K AST covariance matrices in flux units bias : KxN dim numpy vector K vectors of the biases in each filter completenesses : K dim numpy vector completeness versus model corr_mats : KxNxN dim numpy array K AST correlation matrices ifluxes : KxN dim numpy vector K vectors of the input fluxes in each filter """ # find the stars by using unique values of the magnitude values # in filtername filtername = filters[-1] + '_IN' uvals, ucounts = np.unique(self.data[filtername], return_counts=True) n_models = len(uvals) # setup the output n_filters = len(filters) all_covs = np.empty((n_models, n_filters, n_filters), dtype=np.float64) all_corrs = np.empty((n_models, n_filters, n_filters), dtype=np.float32) all_biases = np.empty((n_models, n_filters), dtype=np.float64) all_ifluxes = np.empty((n_models, n_filters), dtype=np.float32) all_compls = np.empty((n_models), dtype=np.float32) ast_minmax = np.empty((2, n_filters), dtype=np.float64) ast_minmax[0, :] = 1e99 ast_minmax[1, :] = 1e-99 # loop over the unique set of models and # calculate the covariance matrix using the ASTs for this model good_asts = np.full((n_models), True) if progress is True: it = Pbar(desc='Calculating AST Covariance ' + 'Matrices').iterover(list(range(n_models))) else: it = list(range(n_models)) for i in it: # find all the ASTs for this model indxs, = np.where(self.data[filtername] == uvals[i]) n_asts = len(indxs) if n_asts > 5: results = self._calc_ast_cov(indxs, filters, return_all=True) if results: all_covs[i, :, :] = results[0] all_biases[i, :] = results[1] all_corrs[i, :, :] = results[3] all_ifluxes[i, :] = results[5] all_compls[i] = results[6] for k in range(n_filters): ast_minmax[0, k] = min(ast_minmax[0, k], all_ifluxes[i, k]) ast_minmax[1, k] = max(ast_minmax[1, k], all_ifluxes[i, k]) else: good_asts[i] = False indxs, = np.where(good_asts) return (all_covs[indxs, :, :], all_biases[indxs, :], all_compls[indxs], all_corrs[indxs, :, :], all_ifluxes[indxs, :], ast_minmax)
[docs] def process_asts(self, filters): """ Process all the AST results creating average biases and covariance matrices for each model SED. Also, prep for the interpolation by setting up the kd-tree Parameters ---------- filters : filter names for the AST data Returns ------- N/A. """ results = self._calc_all_ast_cov(filters) self._cov_matrices = results[0] self._biases = results[1] self._completenesses = results[2] self._corr_matrices = results[3] self._input_fluxes = results[4] self._minmax_asts = results[5] print('building kd-tree...') self._kdtree = cKDTree(np.log10(self._input_fluxes)) print('...done')
[docs] def __call__(self, sedgrid, generic_absflux_a_matrix=None, progress=True): """ Interpolate the results of the ASTs on the model grid Parameters ---------- sedgrid: beast.core.grid type model grid to interpolate AST results on Returns ------- progress: bool, optional if set, display a progress bar """ flux = sedgrid.seds if generic_absflux_a_matrix is not None: model_absflux_cov = False if generic_absflux_a_matrix is not None: print('using model indepdent absflux cov matrix') else: print('not using any absflux cov matrix') elif ((sedgrid.cov_diag is not None) & (sedgrid.cov_offdiag is not None)): model_absflux_cov = True absflux_cov_diag = sedgrid.cov_diag absflux_cov_offdiag = sedgrid.cov_offdiag print('using model dependent absflux cov matrix') else: model_absflux_cov = False n_models, n_filters = flux.shape n_offdiag = (((n_filters**2)-n_filters)/2) if n_filters != len(self.filters): raise AttributeError('the grid of models does not seem to' + 'be defined with the same number of filters') biases = np.empty((n_models, n_filters), dtype=np.float64) sigmas = np.empty((n_models, n_filters), dtype=np.float64) cov_diag = np.empty((n_models, n_filters), dtype=np.float64) cov_offdiag = np.empty((n_models, n_offdiag), dtype=np.float64) icov_diag = np.empty((n_models, n_filters), dtype=np.float64) icov_offdiag = np.empty((n_models, n_offdiag), dtype=np.float64) q_norm = np.empty((n_models), dtype=np.float64) compls = np.empty((n_models), dtype=float) if progress is True: it = Pbar(desc='Evaluating model').iterover(list(range(n_models))) else: it = list(range(n_models)) for i in it: # AST results are in vega fluxes cur_flux = flux[i, :] # find the 10 nearest neighbors to the model SED result = self._kdtree.query(np.log10(cur_flux), 10) dist = result[0] indxs = result[1] # check if the distance is very small, set to a reasonable value tindxs, = np.where(dist < 0.01) if len(tindxs) > 0: dist[tindxs] = 0.01 # compute the interpolated covariance matrix # use the distances to generate weights for the sum dist_weights = 1.0/dist dist_weights /= np.sum(dist_weights) cur_cov_matrix = np.average(self._cov_matrices[indxs, :, :], axis=0, weights=dist_weights) # add in the absflux covariance matrix # unpack off diagonal terms the same way they were packed if model_absflux_cov: m = 0 cur_cov_matrix[n_filters-1, n_filters-1] += \ absflux_cov_diag[i, n_filters-1] for k in range(n_filters-1): cur_cov_matrix[k, k] += absflux_cov_diag[i, k] for l in range(k+1, n_filters): cur_cov_matrix[k, l] += absflux_cov_offdiag[i, m] cur_cov_matrix[l, k] += absflux_cov_offdiag[i, m] m += 1 elif generic_absflux_a_matrix is not None: for k in range(n_filters): for l in range(n_filters): cur_cov_matrix[k, l] += (generic_absflux_a_matrix[k, l] * cur_flux[k]*cur_flux[l]) # compute the interpolated biases biases[i, :] = np.average(self._biases[indxs, :], axis=0, weights=dist_weights) # compute the interpolated completeness compls[i] = np.average(self._completenesses[indxs], weights=dist_weights) # save the straight uncertainties sigmas[i, :] = np.sqrt(np.diagonal(cur_cov_matrix)) # invert covariance matrix inv_cur_cov_matrix = np.linalg.inv(cur_cov_matrix) # save the diagnonal and packed version of non-diagonal terms m = 0 icov_diag[i, n_filters-1] = inv_cur_cov_matrix[n_filters-1, n_filters-1] cov_diag[i, n_filters-1] = cur_cov_matrix[n_filters-1, n_filters-1] for k in range(n_filters-1): icov_diag[i, k] = inv_cur_cov_matrix[k, k] cov_diag[i, k] = cur_cov_matrix[k, k] for l in range(k+1, n_filters): icov_offdiag[i, m] = inv_cur_cov_matrix[k, l] cov_offdiag[i, m] = cur_cov_matrix[k, l] m += 1 # save the log of the determinat for normalization # the ln(det) is calculated and saved as this is what will # be used in the actual calculation # norm = 1.0/sqrt(Q) det = np.linalg.slogdet(cur_cov_matrix) if det[0] <= 0: print('something bad happened') print('determinant of covarinace matrix is zero or negative') print(det) q_norm[i] = -0.5*det[1] return (biases, sigmas, compls, q_norm, icov_diag, icov_offdiag, cov_diag, cov_offdiag)