"""
Backends to handle the model grids different ways
=================================================
Multiple backends are available to reduce the memory footprint for a
performance cost as small as possible.
Implemented Backends
--------------------
MemoryBackend:
Load everything into memory. Can initiate from variables, a filename,
CacheBackend, and DiskBackend.
CacheBackend:
Load data only at the first request. You can work using only seds or only
model properties without the overhead of loading both. (works with FITS and
HDF files). Offers also dropping part of the data.
DiskBackend:
Works directly with an h5py support, ie., on disk. Cache and reading
are allowed through any way offered by h5py, which becomes very handy
for very low-memory tasks such as doing single star figures.
"""
import sys
from astropy.io import fits
import h5py
import copy
from astropy.table import Table
from astropy.io.misc.hdf5 import read_table_hdf5
from beast.physicsmodel.helpers.gridhelpers import pretty_size_print, isNestedInstance
__all__ = ["GridBackend", "MemoryBackend", "CacheBackend", "DiskBackend"]
def _decodebytestring(a):
"""
Convert to string if input is a bytestring.
Parameters
----------
a : byte or str
string or bytestring
Returns
-------
str
string version of input
"""
if isinstance(a, bytes):
return a.decode()
else:
return a
def _gethdfdatasetmeta(hdfds):
"""
Extract the meta(header) information from the grid dataset in a hdf file.
Done without reading the entire grid into memory.
Parameters
----------
hdfds : h5py dataset
the hdf dataset
Returns
-------
header : dict
dictionary of header information
"""
exclude = ["NROWS", "VERSION", "CLASS", "EXTNAME"]
header = {}
attrs = hdfds.attrs
for k in hdfds.attrs.keys():
if (k not in exclude) & (k[:5] != "FIELD") & (k[:5] != "ALIAS"):
header[k] = _decodebytestring(attrs[k])
return header
[docs]
class GridBackend(object):
"""
How the content of a grid is handled. The idea is to provide enough
flexibility that low-memory footprint can be achieved if needed
This class is a generic implementation that will be derived into
more classes
"""
def __init__(self, *args, **kwargs):
self._filters = None
self._header = None
self.fname = None
self._aliases = {}
@property
def nbytes(self):
""" return the number of bytes of the object """
n = sum(
k.nbytes if hasattr(k, "nbytes") else sys.getsizeof(k)
for k in list(self.__dict__.values())
)
return n
@property
def header(self):
return self._header
def __len__(self):
""" number of models in grid """
return len(self.grid)
[docs]
def keys(self):
""" returns the grid keys"""
if hasattr(self.grid, "keys"):
return list(self.grid.keys())
else:
return []
def _get_type(self, fname):
""" determine the type of the file fname
"""
types = {"fits": "fits", "hdf": "hdf", "hd5": "hdf", "hdf5": "hdf"}
# else:
fext = fname.split(".")[-1]
if fext not in types:
raise ValueError(f"{fext} file type not supported")
else:
return types[fext]
def __repr__(self):
""" print the object and memory usage"""
txt = "{}\n source: {}, \n current memory footprint: {}"
return txt.format(
object.__repr__(self), self.fname, pretty_size_print(self.nbytes)
)
[docs]
def write(self, fname, append=False):
"""
Save the file in a format based on the filename extension
fname: str
filename (incl. path)
"""
# non supported types raise an error in self._get_type
if self._get_type(fname) == "fits":
self.writeFITS(fname)
elif self._get_type(fname) == "hdf":
self.writeHDF(fname, append=append)
[docs]
def writeFITS(self, fname, overwrite=False):
"""
Save to fits file
Parameters
----------
fname: str
filename (incl. path) to export to
overwrite : bool, optional
Set to overwrite the fits file
"""
if (self.lamb is not None) & (self.seds is not None) & (self.grid is not None):
if not isinstance(self.grid, Table):
raise ValueError("Only astropy.Table are supported")
hdulist = fits.HDUList()
hdulist.append(fits.PrimaryHDU(self.lamb))
hdulist.append(fits.ImageHDU(self.seds, name="seds"))
if self.cov_diag is not None:
hdulist.append(fits.ImageHDU(self.cov_diag, name="covdiag"))
if self.cov_offdiag is not None:
hdulist.append(fits.ImageHDU(self.cov_offdiag, name="covoffdiag"))
hdulist.append(fits.BinTableHDU(self.grid, name="grid"))
hdulist.writeto(fname, overwrite=overwrite)
else:
raise ValueError("Full data set not specified (lamb, seds, grid)")
[docs]
def writeHDF(self, fname, append=False):
"""
Save to HDF file
Parameters
----------
fname : str
filename (incl. path)
append : bool, optional (default False)
if set, it will append data to each Array or Table
"""
if (self.lamb is not None) & (self.seds is not None) & (self.grid is not None):
if not isinstance(self.grid, Table):
raise ValueError("Only astropy.Table are supported")
with h5py.File(fname, "w") as hd:
if (not append) or ("seds" not in hd.keys()):
hd["seds"] = self.seds[:]
hd["lamb"] = self.lamb[:]
if self.cov_diag is not None:
hd["covdiag"] = self.cov_diag[:]
if self.cov_offdiag is not None:
hd["covoffdiag"] = self.cov_offdiag[:]
else:
raise Exception("Appending to HDF5 file not supported")
if getattr(self, "filters", None) is not None:
if "filters" not in list(self.header.keys()):
self.header["filters"] = " ".join(self.filters)
# append the table of the grid parameters to the file
self.grid.meta = self.header
self.grid.write(fname, path="grid", format="hdf5", append=True)
else:
raise ValueError("Full data set not specified (lamb, seds, grid)")
def _from_GridBackend(self, b):
"""
_from_GridBackend -- convert from generic backend
Parameters
----------
b: GridBackend or sub class
backend to convert from
"""
self.lamb = b.lamb
self.seds = b.seds
self.grid = b.grid
self._filters = b._filters
self._header = b.header
self._aliases = b._aliases
def _from_DiskBackend(self, b):
"""
convert from DiskBackend
Parameters
----------
b: GridBackend or sub class
backend to convert from
"""
self.lamb = b.lamb.read()
self.seds = b.seds.read()
self.grid = Table(b.grid.read())
self._filters = b._filters[:]
self._header = b.header
self._aliases = b._aliases
[docs]
class MemoryBackend(GridBackend):
""" Instanciate a grid object that has no physical storage
Helps to create new grids on the fly. Because it derives from
ModelGrid, this can be exported on disk too.
"""
def __init__(
self,
lamb,
seds=None,
grid=None,
cov_diag=None,
cov_offdiag=None,
header={},
aliases={},
):
"""
Parameters
----------
lamb : ndarray or str or GridBackend subclass
if ndarray - 1D `float` wavelength of the SEDs (requires seds and grid arguments)
if str - filename of grid on disk
if backend - ref to the given grid
seds : ndarray, optional
2D `float` array of seds
grid : astropy.Table, optional
table of properties associated to each sed
cov_diag : ndarray, optional
2D `float` array (# models, # filters) of the diagonal elements
of the absolute flux covariance matrix
cov_offdiag : ndarray, optional
2D `float` array (# models, # elements) of the off diagonal elements
of the absolute flux covariance matrix
header : dict, optional
if provided, update the grid table header
aliases : dict, optional
if provided, update the grid table aliases
"""
super().__init__()
# read from various formats
if isinstance(lamb, DiskBackend):
self._from_DiskBackend(lamb)
elif isNestedInstance(lamb, GridBackend):
self._from_GridBackend(lamb)
elif isinstance(lamb, (str, bytes)):
self._from_File(lamb)
else:
if (seds is None) | (grid is None):
raise ValueError("seds or grid not passed")
self.lamb = lamb
self.seds = seds
self.grid = grid
if (cov_diag is not None) & (cov_offdiag is not None):
self.cov_diag = cov_diag
self.cov_offdiag = cov_offdiag
else:
self.cov_diag = None
self.cov_offdiag = None
# update header
if self._header is None:
self._header = header
else:
for k, v in list(header.items()):
self.header[k] = v
# update aliases
self._aliases.update(aliases)
self.fname = ":memory:"
@property
def filters(self):
"""filters"""
r = self._header.get("filters", None) or self._header.get("FILTERS", None)
if r is None:
return r
r = r.split()
return [_decodebytestring(tr) for tr in r]
def _from_File(self, fname):
"""
Load the content of a file
Parameters
----------
fname: str
filename (incl. path) to read from
"""
# load_seds - load wavelength and seds
if self._get_type(fname) == "fits":
with fits.open(fname) as f:
extnames = [f[k].header["EXTNAME"].lower() for k in range(1, len(f))]
if "seds" in extnames: # new format
self.lamb = f[0].data
self.seds = f["seds"].data
if "covdiag" in extnames:
self.cov_diag = f["covdiag"].data
else:
self.cov_diag = None
if "covoffdiag" in extnames:
self.cov_offdiag = f["covoffdiag"].data
else:
self.cov_offdiag = None
self._header = f["grid"].header
self.grid = Table(f["grid"].data)
else: # old format (used for stellar atmosphere grids, remove when those updated)
with fits.open(fname) as f:
self.seds = f[0].data[:-1]
self.lamb = f[0].data[-1]
self._header = f[1].header
self.grid = Table.read(fname)
elif self._get_type(fname) == "hdf":
with h5py.File(fname, "r") as s:
self.seds = s["seds"][()]
self.lamb = s["lamb"][()]
if "covdiag" in s.keys():
self.cov_diag = s["covdiag"][()]
else:
self.cov_diag = None
if "covdiag" in s.keys():
self.cov_offdiag = s["covoffdiag"][()]
else:
self.cov_offdiag = None
self.grid = read_table_hdf5(s["grid"])
self._header = self.grid.meta
if "filters" in self._header.keys():
self._header["filters"] = _decodebytestring(self._header["filters"])
[docs]
def copy(self):
""" implement a copy method """
g = MemoryBackend(
copy.deepcopy(self.lamb),
seds=copy.deepcopy(self.seds),
grid=copy.deepcopy(self.grid),
cov_diag=copy.deepcopy(self.cov_diag),
cov_offdiag=copy.deepcopy(self.cov_offdiag),
)
g._filters = copy.deepcopy(self._filters)
g._header = copy.deepcopy(self._header)
g._aliases = copy.deepcopy(self._aliases)
return g
[docs]
class CacheBackend(GridBackend):
"""
Load content from a file only when needed
"""
def __init__(self, fname, *args, **kwargs):
"""
Parameters
----------
fname : str
name of file containing the grid
"""
super().__init__(*args, **kwargs)
self.fname = fname
self._type = self._get_type(fname)
self.clear()
[docs]
def clear(self, attrname=None):
"""clear current cache
Parameters
----------
attrname : str in [lamb, filters, grid, header, lamb, seds]
if provided clear only one attribute
else all cache will be erased
"""
if attrname is None:
self._seds = None
self._lamb = None
self._cov_diag = None
self._cov_offdiag = None
self._filters = None
self._grid = None
self._header = None
self._filters = None
else:
setattr(self, "_{0}".format(attrname), None)
def _load_seds(self):
"""
Load in the SEDs from file if not present
"""
if self._seds is None:
if self._type == "fits":
with fits.open(self.fname) as f:
self._seds = f["seds"].data
elif self._type == "hdf":
with h5py.File(self.fname, "r") as s:
self._seds = s["seds"][()]
def _load_cov_diag(self):
"""
Load in the cov_diag from file if not present
"""
if self._cov_diag is None:
if self._type == "fits":
with fits.open(self.fname) as f:
self._cov_diag = f["covdiag"].data
elif self._type == "hdf":
with h5py.File(self.fname, "r") as s:
self._cov_diag = s["covdiag"][()]
def _load_cov_offdiag(self):
"""
Load in the cov_offdiag from file if not present
"""
if self._cov_offdiag is None:
if self._type == "fits":
with fits.open(self.fname) as f:
self._cov_offdiag = f["covoffdiag"].data
elif self._type == "hdf":
with h5py.File(self.fname, "r") as s:
self._cov_offdiag = s["covoffdiag"][()]
def _load_lamb(self):
"""
Load in the wavelengths from file if not present
"""
if self._lamb is None:
if self._type == "fits":
with fits.open(self.fname) as f:
self._lamb = f[0].data
elif self._type == "hdf":
with h5py.File(self.fname, mode="r") as s:
self._lamb = s["lamb"][()]
def _load_grid(self):
"""
Load in the grid from file if not present
"""
if self._grid is None:
if self._type == "fits":
self._grid = Table.read(self.fname)
elif self._type == "hdf":
self._grid = Table.read(self.fname, path="grid", format="hdf5")
def _load_header(self):
"""
Load in the header of the grid if not present
"""
if self._header is None:
if self._type == "fits":
with fits.open(self.fname) as f:
self._header = f["grid"].header
elif self._type == "hdf":
with h5py.File(self.fname, mode="r") as s:
self._header = _gethdfdatasetmeta(s["grid"])
def _load_filters(self):
"""
Load in the filters from file if not present
"""
if self._filters is None:
self._load_header()
try1 = self._header.get("FILTERS", None)
try2 = self._header.get("filters", None)
self._filters = try1 or try2
if self._filters is not None:
self._filters = self._filters.split()
@property
def seds(self):
self._load_seds()
return self._seds
@seds.setter
def seds(self, value):
self._seds = value
@property
def lamb(self):
self._load_lamb()
return self._lamb
@lamb.setter
def lamb(self, value):
self._lamb = value
@property
def cov_diag(self):
self._load_cov_diag()
return self._cov_diag
@cov_diag.setter
def cov_diag(self, value):
self._cov_diag = value
@property
def cov_offdiag(self):
self._load_cov_offdiag()
return self._cov_offdiag
@cov_offdiag.setter
def cov_offdiag(self, value):
self._cov_offdiag = value
@property
def grid(self):
self._load_grid()
return self._grid
@grid.setter
def grid(self, value):
self._grid = value
@property
def header(self):
self._load_header()
return self._header
@header.setter
def header(self, value):
self._header = value
@property
def filters(self):
self._load_filters()
return self._filters
@filters.setter
def filters(self, value):
self._filters = value
[docs]
def keys(self):
""" return column names when possible, avoid loading when possible """
if hasattr(self._grid, "keys"):
return list(self._grid.keys())
else:
super().keys()
[docs]
def copy(self):
""" implement a copy method """
g = CacheBackend(self.fname)
g._aliases = copy.deepcopy(self._aliases)
if self._grid is not None:
g._grid = copy.deepcopy(self._grid)
if self._seds is not None:
g._seds = copy.deepcopy(self._seds)
if self._lamb is not None:
g._lamb = copy.deepcopy(self._lamb)
if self._cov_diag is not None:
g._cov_diag = copy.deepcopy(self._cov_diag)
if self._cov_offdiag is not None:
g._cov_offdiag = copy.deepcopy(self._cov_offdiag)
if self._header is not None:
g._header = copy.deepcopy(self._header)
if self._filters is not None:
g._filters = copy.deepcopy(self._filters)
return g
[docs]
class DiskBackend(GridBackend):
"""
Reads the data from disk when it is accessed. This supports reading
only a portion (e.g., slices/subsets) of the requsted data.
This allows spectral and SED grids larger than can fit into memory.
Only hdf files supported.
"""
def __init__(self, fname, *args, **kwargs):
super().__init__(*args, **kwargs)
ftype = self._get_type(fname)
if ftype != "hdf":
raise ValueError("Expecting HDF file got {0}".format(ftype))
self.fname = fname
self.store = h5py.File(self.fname, mode="r")
self.seds = self.store["seds"]
self.lamb = self.store["lamb"]
self.grid = self.store["grid"]
self.cov_diag = None
if "covdiag" in self.store.keys():
self.cov_diag = self.store["covdiag"]
if "covoffdiag" in self.store.keys():
self.cov_offdiag = self.store["covoffdiag"]
self._filters = None
self._header = None
self._aliases = {}
@property
def header(self):
if self._header is None:
self._header = _gethdfdatasetmeta(self.grid)
return self._header
@property
def filters(self):
"""load in cache if needed """
if self._filters is None:
if self._header is None:
self._header = _gethdfdatasetmeta(self.grid)
self._filters = self._header.get("FILTERS", None) or self._header.get(
"filters", None
)
if self._filters is not None:
self._filters = self._filters.split()
return self._filters
[docs]
def keys(self):
""" return column names when possible """
return list(self.grid.dtype.fields.keys())
[docs]
def copy(self):
g = DiskBackend(self.fname)
g._aliases = copy.deepcopy(self._aliases)
return g