Source code for beast.observationmodel.noisemodel.trunchen

"""
Trunchen version of noisemodel
Goal is to compute the full 6-band covariance matrix for each model
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import math
import numpy as np

from scipy.spatial import cKDTree

#from .noisemodel_trunchen import NoiseModel
from .noisemodel import NoiseModel
from ..vega import Vega

from ...tools.pbar import Pbar

__all__ = ['MultiFilterASTs']

[docs]class MultiFilterASTs(NoiseModel): """ Implement a noise model where the ASTs are provided as a single table Attributes ---------- astfile: str file containing the ASTs filters: sequence(str) sequence of filter names """ def __init__(self, astfile, filters, *args, **kwargs): NoiseModel.__init__(self, astfile, *args, **kwargs) self.setFilters(filters) # needs updating self._input_fluxes = None self._biases = None self._completenesses = None self._cov_matrices = None self._corr_matrices = None
[docs] def setFilters(self, filters): """ set the filters and update the vega reference for the conversions Parameters ---------- filters: sequence list of filters using the internally normalized namings """ self.filters = filters # ASTs inputs are in vega mag whereas models are in flux units # for optimization purpose: pre-compute with Vega() as v: _, vega_flux, _ = v.getFlux(filters) self.vega_flux = vega_flux
def _calc_ast_cov(self, indxs, filters, return_all=False): """ The NxN-dimensional covariance matrix and N-dimensional bias vector are calculated from M independent ASTs computed for N bands Parameters ---------- indxs : index array giving the ASTs assocaited with a single model SED filters : base filter names in the AST file Keywords -------- return_all : True/False Returns ------- if return_all = False (cov_mat, bias, compls) else (cov_mat, bias, stddevs, corr_mat, diffs, ifluxes, compls) cov_mat : NxN dim numpy array covariance matrix in flux units bias : N dim numpy vector vector of the biases in each filter stddevs : N dim numpy vector vector of standard deviations in each filter corr_mat : NxN dim numpy array correlation matrix diffs : KxN dim numpy vector raw flux differences for N filters and K AST instances ifluxes : N dim numpy vector input fluxes of the AST in each filter compl : float AST completeness for this model """ # set the asts for this star using the input index array asts = self.data[indxs] # now check that the source was recovered in at least 1 band # this replicates how the observed catalog is created n_asts = len(asts) gtindxs = np.full((n_asts),1) for k in range(n_asts): cgood = 0 for cfilter in filters: if asts[cfilter+'_VEGA'][k] < 90: cgood = cgood + 1 gtindxs[k] = cgood indxs, = np.where(gtindxs > 0) n_indxs = len(indxs) if n_indxs <= 5: return False # completeness compl = float(n_indxs)/float(n_asts) # setup the variables for output n_filters = len(filters) ifluxes = np.empty((n_filters),dtype=np.float32) diffs = np.empty((n_filters,n_indxs),dtype=np.float32) biases = np.empty((n_filters),dtype=np.float32) cov_matrix = np.full((n_filters, n_filters),0.0,dtype=np.float32) for ck, cfilter in enumerate(filters): ifluxes[ck] = np.power(10.0,-0.4*asts[cfilter+'_IN'][indxs[0]])* \ self.vega_flux[ck] # compute the difference vector between the input and output fluxes # note that the input fluxes are in magnitudes and the # output fluxes in normalized vega fluxes diffs[ck,:] = asts[cfilter+'_RATE'][indxs]*self.vega_flux[ck] - \ ifluxes[ck] # compute the bias and standard deviations around said bias biases[ck] = np.mean(diffs[ck,:]) # compute the covariance matrix for ck in range(n_filters): for dk in range(ck,n_filters): for ci in range(n_indxs): cov_matrix[ck,dk] += (diffs[ck,ci] - biases[ck])* \ (diffs[dk,ci] - biases[dk]) # fill in the symmetric terms cov_matrix[dk,ck] = cov_matrix[ck,dk] cov_matrix /= (n_indxs - 1) stddevs = np.sqrt(np.diagonal(cov_matrix)) # compute the corrleation matrix corr_matrix = np.array(cov_matrix) for ck in range(n_filters): for dk in range(ck,n_filters): if stddevs[ck]*stddevs[dk] > 0: corr_matrix[ck,dk] /= stddevs[ck]*stddevs[dk] else: corr_matrix[ck,dk] = 0.0 # fill in the symmetric terms corr_matrix[dk,ck] = corr_matrix[ck,dk] if return_all: return (cov_matrix, biases, stddevs, corr_matrix, diffs, ifluxes, compl) else: return (cov_matrix, biases, compl) def _calc_all_ast_cov(self, filters, progress=True): """ The covariance matrices and biases are calculated for all the independent models in the AST file Parameters ---------- filters : filter names for the AST data Keywords -------- progress: bool, optional if set, display a progress bar Returns ------- (cov_mats, biases, completenesses, corr_mats, ifluxes) cov_mats : KxNxN dim numpy array K AST covariance matrices in flux units bias : KxN dim numpy vector K vectors of the biases in each filter completenesses : K dim numpy vector completeness versus model corr_mats : KxNxN dim numpy array K AST correlation matrices ifluxes : KxN dim numpy vector K vectors of the input fluxes in each filter """ # find the stars by using unique values of the magnitude values # in filtername filtername = filters[-1] + '_IN' uvals, ucounts = np.unique(self.data[filtername], return_counts=True) n_models = len(uvals) # setup the output n_filters = len(filters) all_covs = np.empty((n_models,n_filters,n_filters),dtype=np.float64) all_corrs = np.empty((n_models,n_filters,n_filters),dtype=np.float32) all_biases = np.empty((n_models,n_filters),dtype=np.float64) all_ifluxes = np.empty((n_models,n_filters),dtype=np.float32) all_compls = np.empty((n_models),dtype=np.float32) ast_minmax = np.empty((2,n_filters),dtype=np.float64) ast_minmax[0,:] = 1e99 ast_minmax[1,:] = 1e-99 # loop over the unique set of models and # calculate the covariance matrix using the ASTs for this model good_asts = np.full((n_models),True) if progress is True: it = Pbar(desc='Calculating AST Covariance ' + \ 'Matrices').iterover(list(range(n_models))) else: it = list(range(n_models)) for i in it: # find all the ASTs for this model indxs, = np.where(self.data[filtername] == uvals[i]) n_asts = len(indxs) if n_asts > 5: results = self._calc_ast_cov(indxs, filters, return_all=True) if results: all_covs[i,:,:] = results[0] all_biases[i,:] = results[1] all_corrs[i,:,:] = results[3] all_ifluxes[i,:] = results[5] all_compls[i] = results[6] for k in range(n_filters): ast_minmax[0,k] = min(ast_minmax[0,k],all_ifluxes[i,k]) ast_minmax[1,k] = max(ast_minmax[1,k],all_ifluxes[i,k]) else: good_asts[i] = False indxs, = np.where(good_asts) return (all_covs[indxs,:,:], all_biases[indxs,:], all_compls[indxs], all_corrs[indxs,:,:], all_ifluxes[indxs,:], ast_minmax)
[docs] def process_asts(self, filters): """ Process all the AST results creating average biases and covariance matrices for each model SED. Also, prep for the interpolation by setting up the kd-tree Parameters ---------- filters : filter names for the AST data Returns ------- N/A. """ results = self._calc_all_ast_cov(filters) self._cov_matrices = results[0] self._biases = results[1] self._completenesses = results[2] self._corr_matrices = results[3] self._input_fluxes = results[4] self._minmax_asts = results[5] print('building kd-tree...') self._kdtree = cKDTree(np.log10(self._input_fluxes)) print('...done')
[docs] def __call__(self, sedgrid, generic_absflux_a_matrix=None, progress=True): """ Interpolate the results of the ASTs on the model grid Parameters ---------- sedgrid: beast.core.grid type model grid to interpolate AST results on Returns ------- progress: bool, optional if set, display a progress bar """ flux = sedgrid.seds if generic_absflux_a_matrix is not None: model_absflux_cov = False if generic_absflux_a_matrix is not None: print('using model indepdent absflux cov matrix') else: print('not using any absflux cov matrix') elif ((sedgrid.cov_diag is not None) & (sedgrid.cov_offdiag is not None)): model_absflux_cov = True absflux_cov_diag = sedgrid.cov_diag absflux_cov_offdiag = sedgrid.cov_offdiag print('using model dependent absflux cov matrix') else: model_absflux_cov = False n_models, n_filters = flux.shape n_offdiag = (((n_filters**2)-n_filters)/2) if n_filters != len(self.filters): raise AttributeError('the grid of models does not seem to' + 'be defined with the same number of filters') biases = np.empty((n_models, n_filters), dtype=np.float64) sigmas = np.empty((n_models, n_filters), dtype=np.float64) cov_diag = np.empty((n_models, n_filters), dtype=np.float64) cov_offdiag = np.empty((n_models, n_offdiag), dtype=np.float64) icov_diag = np.empty((n_models, n_filters), dtype=np.float64) icov_offdiag = np.empty((n_models, n_offdiag), dtype=np.float64) q_norm = np.empty((n_models), dtype=np.float64) compls = np.empty((n_models), dtype=float) if progress is True: it = Pbar(desc='Evaluating model').iterover(list(range(n_models))) else: it = list(range(n_models)) for i in it: # AST results are in vega fluxes cur_flux = flux[i,:] # find the 10 nearest neighbors to the model SED result = self._kdtree.query(np.log10(cur_flux),10) dist = result[0] indxs = result[1] # check if the distance is very small, set to a reasonable value tindxs, = np.where(dist < 0.01) if len(tindxs) > 0: dist[tindxs] = 0.01 # compute the interpolated covariance matrix # use the distances to generate weights for the sum dist_weights = 1.0/dist dist_weights /= np.sum(dist_weights) cur_cov_matrix = np.average(self._cov_matrices[indxs,:,:], axis=0, weights=dist_weights) # add in the absflux covariance matrix # unpack off diagonal terms the same way they were packed if model_absflux_cov: m = 0 cur_cov_matrix[n_filters-1,n_filters-1] += \ absflux_cov_diag[i,n_filters-1] for k in range(n_filters-1): cur_cov_matrix[k,k] += absflux_cov_diag[i,k] for l in range(k+1,n_filters): cur_cov_matrix[k,l] += absflux_cov_offdiag[i,m] cur_cov_matrix[l,k] += absflux_cov_offdiag[i,m] m += 1 elif generic_absflux_a_matrix is not None: for k in range(n_filters): for l in range(n_filters): cur_cov_matrix[k,l] += (generic_absflux_a_matrix[k,l]* cur_flux[k]*cur_flux[l]) # compute the interpolated biases biases[i,:] = np.average(self._biases[indxs,:], axis=0, weights=dist_weights) # compute the interpolated completeness compls[i] = np.average(self._completenesses[indxs], weights=dist_weights) # save the straight uncertainties sigmas[i,:] = np.sqrt(np.diagonal(cur_cov_matrix)) # invert covariance matrix inv_cur_cov_matrix = np.linalg.inv(cur_cov_matrix) # save the diagnonal and packed version of non-diagonal terms m = 0 icov_diag[i,n_filters-1] = inv_cur_cov_matrix[n_filters-1, n_filters-1] cov_diag[i,n_filters-1] = cur_cov_matrix[n_filters-1, n_filters-1] for k in range(n_filters-1): icov_diag[i,k] = inv_cur_cov_matrix[k,k] cov_diag[i,k] = cur_cov_matrix[k,k] for l in range(k+1,n_filters): icov_offdiag[i,m] = inv_cur_cov_matrix[k,l] cov_offdiag[i,m] = cur_cov_matrix[k,l] m += 1 # save the log of the determinat for normalization # the ln(det) is calculated and saved as this is what will # be used in the actual calculation # norm = 1.0/sqrt(Q) det = np.linalg.slogdet(cur_cov_matrix) #print(det) if det[0] <= 0: print('something bad happened') print('determinant of covarinace matrix is zero or negative') print(det) q_norm[i] = -0.5*det[1] return (biases, sigmas, compls, q_norm, icov_diag, icov_offdiag, cov_diag, cov_offdiag)