Source code for beast.observationmodel.observations

""" Defines a generic interface to observation catalog
    This enables to handle non detections, (upper limits one day?), flux and
    magnitude conversions to avoid painful preparation of the dataset

    Data model v2 with limited quantity units handling
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import numpy as np
from scipy.interpolate import interp1d

__all__ = ['Observations', 'FakeObs', 'PhotCharact']

[docs]class Observations(object): """ A generic class that interfaces observation catalog in a standardized way Attributes ---------- inputFile: str catalog source file filters: sequence(str) list of filter names (internal standards) desc: str, optional description of the observations badvalue: float, optional value that tags a bad measurement that should not be used in the fitting. nObs: int number of observations in the catalog """ def __init__(self, inputFile, desc=None): """ Generate a data interface object """ self.inputFile = inputFile self.filters = None self.desc = desc self.readData() self.badvalue = None @property def nObs(self): return self.data.nrows def __len__(self): return self.nObs
[docs] def __call__(self): """ Calling the object will show info """ self.info()
[docs] def info(self): """ Prints some information about the catalog """ txt = "Data read from {s.inputFile:s}\n" if self.desc is not None: txt += "Description: {s.desc:s}\n" txt += "Number of records: {s.nObs:d}\n\n" txt += "Dataset contains:" print("Data read from %s " % self.inputFile) if self.desc is not None: print("Description: %s" % self.desc) print("Number of records: %d" % self.nObs) print("") print("Dataset contains:") for k in list(self.data.keys()): txt += "\t {0:s}\n".format(k) if self.filters is None: txt += '\n No filters given yet!' else: txt += '\n Using Filters: {s.filters}\n' print(txt.format(s=self))
def __getitem__(self, *args, **kwargs): """ get item will generate a subsample """ return self.data.__getitem__(*args, **kwargs)
[docs] def keys(self): """ Returns dataset content names """ return self.data.keys()
[docs] def setDescription(self, txt): self.desc = txt
[docs] def setBadValue(self, val): self.badvalue = val
[docs] def getFilters(self): return self.filters
[docs] def setFilters(self, filters): self.filters = filters
[docs] def getMags(self, num, filters): raise Exception('Do not use as magnitudes') return np.array([ self.data[tt][num] for tt in filters])
[docs] def getErrors(self, num, filters): raise Exception('Do not use as magnitudes') return np.array([ self.data[tt + 'err'][num] for tt in filters])
[docs] def getFlux(self, num): """returns the flux of an observation from the number of counts""" flux = np.empty(len(self.filters), dtype=float) for ek, ok in enumerate(self.filters): flux[ek] = self.data[ok][num] return flux
[docs] def getFluxerr(self, num): """returns the error on the flux of an observation from the number of counts (not used in the analysis)""" fluxerr = np.empty(len(self.filters), dtype=float) for ek, ok in enumerate(self.filters): fluxerr[ek] = self.data[ok + '_err'][num] return fluxerr
[docs] def getObs(self, num=0): """ returns the flux""" if self.filters is None: raise AttributeError('No filter set provided.') flux = self.getFlux(num, self.filters) return flux
[docs] def getObsWithUncertainties(self, num=0): """ returns the flux and uncertainties and the mask of bad values""" assert ( not self.filters is None), "No filter set." mags = self.getMags(num, self.filters) errs = self.getErrors(num, self.filters) if not self.badvalue is None: mask = (mags >= self.badvalue) else: mask = np.zeros(len(mags), dtype=bool) return mags, errs, mask
[docs] def readData(self): """ read the dataset from the original source file """ from ..external.eztables import AstroTable if type(self.inputFile) == str: self.data = AstroTable(self.inputFile) else: self.data = self.inputFile
[docs] def iterobs(self): """ yield getObs """ for k in range(self.nObs): yield self.getObs(k)
[docs] def enumobs(self): for k in range(self.nObs): yield k, self.getObs(k)
#****************** # Code below is not tested/used for sometime (KDG - Jul 2017) # Not clear if any of this code is needed any longer. #******************
[docs]class FakeObs(Observations):
[docs] def getObs(self, num=0, err=0.05): assert ( self.filters is not None), "No filter set." mags = self.getMags(num, self.filters) #errs = np.ones(len(mags), dtype=float) * err errs = self.getErrors(num, self.filters) if self.badvalue is not None: mask = (mags >= self.badvalue) else: mask = np.zeros(len(mags), dtype=bool) return mags, errs, mask
[docs] def readData(self): """ read the dataset from the original source file """ from ..external.eztables import Table self.data = Table(self.inputFile)
def gen_FakeObs_from_sedgrid(sedgrid, nrows, err=0.05, filters=None, save=False): from ..external.eztables import Table from . import grid if type(sedgrid) == str: sedgrid = grid.FileSEDGrid(sedgrid) inds = np.random.randint(0, high=sedgrid.grid.nrows, size=nrows) obsTab = Table() if filters is None: filters = sedgrid.grid.header.FILTERS.split() for e, filt in enumerate(filters): errs = np.random.normal(loc=0., scale=err, size=nrows) obsTab.addCol(filt, (1. + errs) * sedgrid.seds[inds, e]) obsTab.addCol(filt + 'err', err * sedgrid.seds[inds, e]) for key in list(sedgrid.grid.keys()): obsTab.addCol(key, sedgrid.grid[key][inds]) if save is True: return obsTab else: obsTab.write(save, clobber=True, append=False)
[docs]class PhotCharact(object): def __init__(self, fname, filters): self.inputFile = fname self.filters = ['HST_WFC3_F275W', 'HST_WFC3_F336W', 'HST_ACS_WFC_F475W', 'HST_ACS_WFC_F814W', 'HST_WFC3_F110W', 'HST_WFC3_F160W'] self.pars = { 'magin': 'magin', 'comp': 'comp', 'bias': 'bias', 'berr': 'bias_err', 'join': '_'} self.interp_bias = { 'kind': 'linear', 'axis': -1, 'copy': False, 'bounds_error': False, 'fill_value': 0.0 } self.interp_bias_error = { 'kind': 'linear', 'axis': -1, 'copy': False, 'bounds_error': False, 'fill_value': 0.0 } self.interp_comp = { 'kind': 'linear', 'axis': -1, 'copy': False, 'bounds_error': False, 'fill_value': 0.0 } self.readData() self._funcs = [ self.getCharactFilterFunctions(k, output='flux') for k in self.filters ]
[docs] def readData(self): """ read the dataset from the original source file """ from ..external.eztables import AstroTable from .vega import Vega self.data = AstroTable(self.inputFile) #data_filters = [ k.split(self.pars['join'])[0] for k in self.data.keys() if k[-5:] == self.pars['magin'] ] #Data are in Vega magnitudes # Need to use Vega with Vega() as v: #self.vega = vega_f, vega_mag, lamb = v.getMag(self.filters) self.vega = v.getMag(self.filters)
[docs] def get_filter_index(self, fname): for e, k in enumerate(self.filters): if k == fname: return e
[docs] def getCharactFilterFunctions(self, fname, output='flux'): if type(fname) == int: _fname = self.filters[fname] else: _fname = fname if output not in ['mag', 'flux']: raise ValueError("interfrom must be either mag or flux") join = self.pars['join'] bkey = self.pars['bias'] ekey = self.pars['berr'] mkey = self.pars['magin'] m_val = self.data[join.join([_fname, mkey])] b_val = self.data[join.join([_fname, bkey])] e_val = self.data[join.join([_fname, ekey])] if output == 'mag': #interp from mags bias_mag_fn = interp1d(m_val, b_val, **self.interp_bias) bias_err_mag_fn = interp1d(m_val, e_val, **self.interp_bias_error) return (bias_mag_fn, bias_err_mag_fn) else: #vegamag to fluxes #b_val is a delta_mag, does not need to check the vega ref. vega_mag = self.vega[1][self.get_filter_index(_fname)] flux_in = np.power(10., -0.4 * (m_val + vega_mag)) flux_bias = np.power(10., -0.4 * (b_val)) flux_err = b_val * ( 1. - np.power(10., -0.4 * e_val) ) bias_flux_fn = interp1d(flux_in, flux_bias, **self.interp_bias) bias_err_flux_fn = interp1d(flux_in, flux_err, **self.interp_bias_error) self.flux_in = flux_in self.flux_bias = flux_bias self.flux_err = flux_err return (bias_flux_fn, bias_err_flux_fn)
[docs] def get_bias_of_sed(self, sed, **kwargs): if np.ndim(sed) > 1: nlamb = np.shape(sed)[1] if nlamb != len(self.filters): raise ValueError('expecting {0} values per sed, got {1}'.format(len(self.filters), nlamb)) biases = np.empty(sed.shape, dtype=float) errors = np.empty(sed.shape, dtype=float) for e, fk in enumerate(self._funcs): biases[e, :] = fk[0](sed[e, :]) errors[e, :] = fk[1](sed[e, :]) else: biases = np.empty(sed.shape, dtype=float) errors = np.empty(sed.shape, dtype=float) for e, fk in enumerate(self._funcs): biases[e] = fk[0](sed[e]) errors[e] = fk[1](sed[e]) return biases, errors