Source code for beast.observationmodel.observations

"""
Defines a generic interface to observation catalog
"""
import numpy as np
from numpy.random import default_rng

from astropy.table import Table, Column

from beast.observationmodel.vega import Vega
from beast.physicsmodel.priormodel import PriorAgeModel, PriorMassModel
from beast.physicsmodel.grid_weights_stars import compute_bin_boundaries

__all__ = ["Observations", "gen_SimObs_from_sedgrid"]


[docs]class Observations(object): """ A generic class that interfaces observation catalog in a standardized way Attributes ---------- inputFile : str catalog source file filters : list list of filter names (internal standards) filter_aliases : dict alias of filter names between internal and external names desc : str description of the observations badvalue : float value that tags a bad measurement that should not be used in the fitting nObs : int number of observations in the catalog """ def __init__( self, inputFile, filters, obs_colnames=None, vega_fname=None, desc=None ): """ Parameters ---------- inputFile : str observation file filters : list interal filter names of the data obs_colnames : list, optional filter names in the observed catalog vega_fname : str, optional name of the file with the vega model spectrum desc : str, optional description of the observations """ if desc is None: self.desc = "GENERIC: %s" % inputFile else: self.desc = desc self.inputFile = inputFile self.setFilters(filters) self.filter_aliases = {} for ik, k in enumerate(filters): self.filter_aliases[k] = obs_colnames[ik] self.readData() self.setVegaFluxes(filters, vega_fname=vega_fname) # some bad values smaller than expected # in physical flux units self.setBadValue(6e-40) @property def nObs(self): return len(self.data) def __len__(self): return self.nObs
[docs] def __call__(self): """ Calling the object will show info """ self.info()
[docs] def info(self): """ Prints some information about the catalog """ txt = "Data read from {s.inputFile:s}\n" if self.desc is not None: txt += "Description: {s.desc:s}\n" txt += "Number of records: {s.nObs:d}\n\n" txt += "Dataset contains:" print("Data read from %s " % self.inputFile) if self.desc is not None: print("Description: %s" % self.desc) print("Number of records: %d" % self.nObs) print("") print("Dataset contains:") for k in list(self.data.keys()): txt += "\t {0:s}\n".format(k) if self.filters is None: txt += "\n No filters given yet!" else: txt += "\n Using Filters: {s.filters}\n" print(txt.format(s=self))
def __getitem__(self, *args, **kwargs): """ get item will generate a subsample """ return self.data.__getitem__(*args, **kwargs)
[docs] def keys(self): """ Returns dataset content names """ return self.data.keys()
[docs] def setDescription(self, txt): self.desc = txt
[docs] def setBadValue(self, val): self.badvalue = val
[docs] def getFilters(self): return self.filters
[docs] def setFilters(self, filters): self.filters = filters
[docs] def setVegaFluxes(self, filters, vega_fname=None): """ Set vega reference fluxes for conversions Parameters ---------- filters : list list of filters using the internally normalized namings vega_fname : str, optional name of the file with the vega model spectrum """ # for optimization purpose: pre-compute with Vega(source=vega_fname) as v: _, vega_flux, _ = v.getFlux(filters) self.vega_flux = vega_flux
[docs] def getFlux(self, num, units=False): """ Flux of an observation computed from normalized vega fluxes Parameters ---------- num : int index of the star in the catalog to get measurement from units : bool if set returns the fluxes with units Returns ------- flux : ndarray[dtype=float, ndim=1] Measured integrated flux values throughout the filters in erg/s/cm^2/A """ if self.vega_flux is None: raise ValueError("vega_flux not set, can't return fluxes") # case for using '_flux' result d = self.data[num] flux = ( np.array([d[self.filter_aliases[ok]] for ok in self.filters]) * self.vega_flux ) if units is True: return flux * units.erg / (units.s * units.cm * units.cm * units.angstrom) else: return flux
[docs] def getFluxerr(self, num): """returns the error on the flux of an observation from the number of counts (not used in the analysis)""" fluxerr = np.zeros(len(self.filters), dtype=float) for ek, ok in enumerate(self.filters): fluxerr[ek] = self.data[ok + "_err"][num] return fluxerr
[docs] def getObs(self, num=0): """ returns the flux""" if self.filters is None: raise AttributeError("No filter set provided.") flux = self.getFlux(num) return flux
[docs] def readData(self): """ read the dataset from the original source file """ if isinstance(self.inputFile, str): self.data = Table.read(self.inputFile) else: self.data = self.inputFile
[docs] def iterobs(self): """ yield getObs """ for k in range(self.nObs): yield self.getObs(k)
[docs] def enumobs(self): for k in range(self.nObs): yield k, self.getObs(k)
[docs]def gen_SimObs_from_sedgrid( sedgrid, sedgrid_noisemodel, nsim=100, compl_filter="max", complcut=None, magcut=None, ranseed=None, vega_fname=None, weight_to_use="weight", age_prior_model=None, mass_prior_model=None, ): """ Generate simulated observations using the physics and observation grids. The priors are sampled as they give the ensemble model for the stellar and dust distributions (IMF, Av distribution etc.). The physics model gives the SEDs based on the priors. The observation model gives the noise, bias, and completeness all of which are used in simulating the observations. Currently written to only work for the toothpick noisemodel. Parameters ---------- sedgrid: grid.SEDgrid instance model grid sedgrid_noisemodel: beast noisemodel instance noise model data nsim : int number of observations to simulate compl_filter : str Filter to use for completeness (required for toothpick model). Set to max to use the max value in all filters. complcut : float (defualt=None) completeness cut for only including model seds above the cut where the completeness cut ranges between 0 and 1. magcut : float (defualt=None) faint-end magnitude cut for only including model seds brighter than the given magnitude in compl_filter. ranseed : int used to set the seed to make the results reproducable, useful for testing vega_fname : string filename for the vega info, useful for testing weight_to_use : string (default='weight') Set to either 'weight' (prior+grid), 'prior_weight', 'grid_weight', or 'uniform' (this option is valid only when nsim is supplied) to choose the weighting for SED selection. age_prior_model : dict age prior model in the BEAST dictonary format mass_prior_model : dict mass prior model in the BEAST dictonary format Returns ------- simtable : astropy Table table giving the simulated observed fluxes as well as the physics model parmaeters """ n_models, n_filters = sedgrid.seds.shape flux = sedgrid.seds # get the vega fluxes for the filters _, vega_flux, _ = Vega(source=vega_fname).getFlux(sedgrid.filters) # cache the noisemodel values model_bias = sedgrid_noisemodel["bias"] model_unc = np.fabs(sedgrid_noisemodel["error"]) model_compl = sedgrid_noisemodel["completeness"] # only use models that have non-zero completeness in all filters # zero completeness means the observation model is not defined for that filters/flux # if complcut is provided, only use models above that completeness cut if complcut is not None: finalcomplcut = complcut else: finalcomplcut = 0.0 ast_defined = model_compl > finalcomplcut sum_ast_defined = np.sum(ast_defined, axis=1) goodobsmod = sum_ast_defined >= n_filters # completeness from toothpick model so n band completeness values # require only 1 completeness value for each model # max picked to best "simulate" how the photometry detection is done if compl_filter.lower() == "max": model_compl = np.max(model_compl, axis=1) else: short_filters = [ filter.split(sep="_")[-1].upper() for filter in sedgrid.filters ] if compl_filter.upper() not in short_filters: raise NotImplementedError( "Requested completeness filter not present:" + compl_filter.upper() + "\nPossible filters:" + "\n".join(short_filters) ) filter_k = short_filters.index(compl_filter.upper()) print("Completeness from %s" % sedgrid.filters[filter_k]) model_compl = model_compl[:, filter_k] # if magcut is provided, only use models brighter than the magnitude cut # in addition to the non-zero completeness criterion if magcut is not None: fluxcut_compl_filter = 10 ** (-0.4 * magcut) * vega_flux[filter_k] goodobsmod = (goodobsmod) & (flux[:, filter_k] >= fluxcut_compl_filter) # initialize the random number generator rangen = default_rng(ranseed) # if the age and mass prior models are given, use them to determine the # total number of stars to simulate model_indx = np.arange(n_models) if (age_prior_model is not None) and (mass_prior_model is not None): nsim = 0 # logage_range = [min(sedgrid["logA"]), max(sedgrid["logA"])] mass_range = [min(sedgrid["M_ini"]), max(sedgrid["M_ini"])] # compute the total mass and average mass of a star given the mass_prior_model nmass = 100 masspts = np.logspace(np.log10(mass_range[0]), np.log10(mass_range[1]), nmass) mass_prior = PriorMassModel(mass_prior_model) massprior = mass_prior(masspts) totmass = np.trapz(massprior, masspts) avemass = np.trapz(masspts * massprior, masspts) / totmass # compute the mass of the remaining stars at each age and # simulate the stars assuming everything is complete gridweights = sedgrid[weight_to_use] gridweights = gridweights / np.sum(gridweights) grid_ages = np.unique(sedgrid["logA"]) age_prior = PriorAgeModel(age_prior_model) ageprior = age_prior(grid_ages) bin_boundaries = compute_bin_boundaries(grid_ages) bin_widths = np.diff(10 ** (bin_boundaries)) totsim_indx = np.array([], dtype=int) for cage, cwidth, cprior in zip(grid_ages, bin_widths, ageprior): gmods = sedgrid["logA"] == cage cur_mass_range = [ min(sedgrid["M_ini"][gmods]), max(sedgrid["M_ini"][gmods]), ] gmass = (masspts >= cur_mass_range[0]) & (masspts <= cur_mass_range[1]) curmasspts = masspts[gmass] curmassprior = massprior[gmass] totcurmass = np.trapz(curmassprior, curmasspts) # compute the mass remaining at each age -> this is the mass to simulate simmass = cprior * cwidth * totcurmass / totmass nsim_curage = int(round(simmass / avemass)) # simluate the stars at the current age curweights = gridweights[gmods] if np.sum(curweights) > 0: curweights /= np.sum(curweights) cursim_indx = rangen.choice( model_indx[gmods], size=nsim_curage, p=curweights ) totsim_indx = np.concatenate((totsim_indx, cursim_indx)) nsim += nsim_curage # totsimcurmass = np.sum(sedgrid["M_ini"][cursim_indx]) # print(cage, totcurmass / totmass, simmass, totsimcurmass, nsim_curage) totsimmass = np.sum(sedgrid["M_ini"][totsim_indx]) print(f"number total simulated stars = {nsim}; mass = {totsimmass}") compl_choice = rangen.random(nsim) compl_indx = model_compl[totsim_indx] >= compl_choice sim_indx = totsim_indx[compl_indx] totcompsimmass = np.sum(sedgrid["M_ini"][sim_indx]) print(f"number of simulated stars w/ completeness = {len(sim_indx)}; mass = {totcompsimmass}") else: # total number of stars to simulate set by command line input if weight_to_use == "uniform": # sample to get the indices of the picked models sim_indx = rangen.choice(model_indx[goodobsmod], nsim) else: gridweights = sedgrid[weight_to_use][goodobsmod] * model_compl[goodobsmod] gridweights = gridweights / np.sum(gridweights) # sample to get the indexes of the picked models sim_indx = rangen.choice(model_indx[goodobsmod], size=nsim, p=gridweights) print(f"number of simulated stars = {nsim}") # setup the output table ot = Table() qnames = list(sedgrid.keys()) # simulated data for k, filter in enumerate(sedgrid.filters): simflux_wbias = flux[sim_indx, k] + model_bias[sim_indx, k] simflux = rangen.normal(loc=simflux_wbias, scale=model_unc[sim_indx, k]) bname = filter.split(sep="_")[-1].upper() fluxname = f"{bname}_FLUX" colname = f"{bname}_RATE" magname = f"{bname}_VEGA" ot[fluxname] = Column(simflux) ot[colname] = Column(ot[fluxname] / vega_flux[k]) pindxs = ot[colname] > 0.0 nindxs = ot[colname] <= 0.0 ot[magname] = Column(ot[colname]) ot[magname][pindxs] = -2.5 * np.log10(ot[colname][pindxs]) ot[magname][nindxs] = 99.999 # add in the physical model values in a form similar to # the output simulated (physics+obs models) values # useful if using the simulated data to interpolate ASTs # (e.g. for MATCH) fluxname = f"{bname}_INPUT_FLUX" ratename = f"{bname}_INPUT_RATE" magname = f"{bname}_INPUT_VEGA" ot[fluxname] = Column(flux[sim_indx, k]) ot[ratename] = Column(ot[fluxname] / vega_flux[k]) pindxs = ot[ratename] > 0.0 nindxs = ot[ratename] <= 0.0 ot[magname] = Column(ot[ratename]) ot[magname][pindxs] = -2.5 * np.log10(ot[ratename][pindxs]) ot[magname][nindxs] = 99.999 # model parmaeters for qname in qnames: ot[qname] = Column(sedgrid[qname][sim_indx]) return ot