Source code for beast.observationmodel.observations

"""
Defines a generic interface to observation catalog
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import numpy as np

from astropy.table import Table, Column

from beast.observationmodel.vega import Vega

__all__ = ['Observations', 'gen_SimObs_from_sedgrid']


[docs]class Observations(object): """ A generic class that interfaces observation catalog in a standardized way Attributes ---------- inputFile: str catalog source file filters: sequence(str) list of filter names (internal standards) desc: str, optional description of the observations badvalue: float, optional value that tags a bad measurement that should not be used in the fitting. nObs: int number of observations in the catalog """ def __init__(self, inputFile, desc=None): """ Generate a data interface object """ self.inputFile = inputFile self.filters = None self.desc = desc self.readData() self.badvalue = None @property def nObs(self): return self.data.nrows def __len__(self): return self.nObs
[docs] def __call__(self): """ Calling the object will show info """ self.info()
[docs] def info(self): """ Prints some information about the catalog """ txt = "Data read from {s.inputFile:s}\n" if self.desc is not None: txt += "Description: {s.desc:s}\n" txt += "Number of records: {s.nObs:d}\n\n" txt += "Dataset contains:" print("Data read from %s " % self.inputFile) if self.desc is not None: print("Description: %s" % self.desc) print("Number of records: %d" % self.nObs) print("") print("Dataset contains:") for k in list(self.data.keys()): txt += "\t {0:s}\n".format(k) if self.filters is None: txt += '\n No filters given yet!' else: txt += '\n Using Filters: {s.filters}\n' print(txt.format(s=self))
def __getitem__(self, *args, **kwargs): """ get item will generate a subsample """ return self.data.__getitem__(*args, **kwargs)
[docs] def keys(self): """ Returns dataset content names """ return self.data.keys()
[docs] def setDescription(self, txt): self.desc = txt
[docs] def setBadValue(self, val): self.badvalue = val
[docs] def getFilters(self): return self.filters
[docs] def setFilters(self, filters): self.filters = filters
[docs] def getMags(self, num, filters): raise Exception('Do not use as magnitudes') return np.array([self.data[tt][num] for tt in filters])
[docs] def getErrors(self, num, filters): raise Exception('Do not use as magnitudes') return np.array([self.data[tt + 'err'][num] for tt in filters])
[docs] def getFlux(self, num): """returns the flux of an observation from the number of counts""" flux = np.empty(len(self.filters), dtype=float) for ek, ok in enumerate(self.filters): flux[ek] = self.data[ok][num] return flux
[docs] def getFluxerr(self, num): """returns the error on the flux of an observation from the number of counts (not used in the analysis)""" fluxerr = np.empty(len(self.filters), dtype=float) for ek, ok in enumerate(self.filters): fluxerr[ek] = self.data[ok + '_err'][num] return fluxerr
[docs] def getObs(self, num=0): """ returns the flux""" if self.filters is None: raise AttributeError('No filter set provided.') flux = self.getFlux(num, self.filters) return flux
[docs] def readData(self): """ read the dataset from the original source file """ from ..external.eztables import AstroTable if type(self.inputFile) == str: self.data = AstroTable(self.inputFile) else: self.data = self.inputFile
[docs] def iterobs(self): """ yield getObs """ for k in range(self.nObs): yield self.getObs(k)
[docs] def enumobs(self): for k in range(self.nObs): yield k, self.getObs(k)
[docs]def gen_SimObs_from_sedgrid(sedgrid, sedgrid_noisemodel, nsim=100, compl_filter='F475W', ranseed=None, vega_fname=None): """ Generate simulated observations using the physics and observation grids. The priors are sampled as they give the ensemble model for the stellar and dust distributions (IMF, Av distribution etc.). The physics model gives the SEDs based on the priors. The observation model gives the noise, bias, and completeness all of which are used in simulating the observations. Currently written to only work for the toothpick noisemodel. Parameters ---------- sedgrid: grid.SEDgrid instance model grid sedgrid_noisemodel: beast noisemodel instance noise model data nsim : int number of observations to simulate compl_filter : str filter to use for completeness (required for toothpick model) ranseed : int used to set the seed to make the results reproducable useful for testing vega_fname : string filename for the vega info usefule for testing Returns ------- simtable : astropy Table table giving the simulated observed fluxes as well as the physics model parmaeters """ flux = sedgrid.seds n_models, n_filters = flux.shape # hack to get things to run for now short_filters = [filter.split(sep='_')[-1].lower() for filter in sedgrid.filters] if compl_filter.lower() not in short_filters: print('requested completeness filter not present') print('%s requested' % compl_filter.lower()) print('possible filters', short_filters) exit() filter_k = short_filters.index(compl_filter.lower()) print('Completeness from %s' % sedgrid.filters[filter_k]) # cache the noisemodel values model_bias = sedgrid_noisemodel.root.bias[:] model_unc = np.fabs(sedgrid_noisemodel.root.error[:]) model_compl = sedgrid_noisemodel.root.completeness[:] # the combined prior and grid weights # using both as the grid weight needed to account for the finite size # of each grid bin # if we change to interpolating between grid points, need to rethink this gridweights = sedgrid['weight']*model_compl[:, filter_k] # need to sum to 1 gridweights = gridweights/np.sum(gridweights) # set the random seed - mainly for testing if not None: np.random.seed(ranseed) # sample to get the indexes of the picked models indx = range(n_models) sim_indx = np.random.choice(indx, size=nsim, p=gridweights) # get the vega fluxes for the filters _, vega_flux, _ = Vega(source=vega_fname).getFlux(sedgrid.filters) # setup the output table ot = Table() qnames = list(sedgrid.keys()) # simulated data for k, filter in enumerate(sedgrid.filters): colname = '%s_rate' % filter.split(sep='_')[-1].lower() simflux_wbias = flux[sim_indx, k] + model_bias[sim_indx, k] simflux = np.random.normal(loc=simflux_wbias, scale=model_unc[sim_indx, k]) ot[colname] = Column(simflux/vega_flux[k]) # model parmaeters for qname in qnames: ot[qname] = Column(sedgrid[qname][sim_indx]) return ot