Source code for kpfpipe.models.level0

"""
Level 0 Data Model
"""
# Standard dependencies
import copy
import warnings
import os

# External dependencies
from astropy.io import fits
from astropy.table import Table
import numpy as np
import pandas as pd

from kpfpipe.models.base_model import KPFDataModel
from kpfpipe.models.metadata import KPF_definitions


[docs] class KPF0(KPFDataModel): """ The level 0 KPF data. Initialized with empty fields. Attributes inherited from KPFDataModel, additional attributes below. Attributes: read_methods (dict): Dictionaries of supported parsers. These parsers are used by the base model to read in .fits files from other instruments. Supported parsers: ``KPF``, ``NEID``, ``PARAS`` """ def __init__(self): """ Constructor """ super().__init__() self.level = 0 extensions = copy.copy(KPF_definitions.LEVEL0_EXTENSIONS) python_types = copy.copy(KPF_definitions.FITS_TYPE_MAP) # add empty level0 extensions and empty headers for each extension for key, value in extensions.items(): if key not in ['PRIMARY', 'RECEIPT', 'CONFIG']: if python_types[value] == np.ndarray: atr = np.array([]) else: atr = python_types[value]([]) self.header[key] = fits.Header() else: continue self.create_extension(key, python_types[value]) setattr(self, key, atr) # add level0 header keywords for PRIMARY header self.header_definitions = pd.read_csv(KPF_definitions.LEVEL0_HEADER_FILE) for i, row in self.header_definitions.iterrows(): ext_name = row['Ext'] if ext_name not in self.header.keys(): continue key = row['Keyword'] val = row['Value'] desc = row['Description'] if val is np.nan: val = None if desc is np.nan: desc = None self.header[ext_name][key] = (val, desc) self.read_methods: dict = { 'KPF': self._read_from_KPF, 'NEID': self._read_from_NEID, 'PARAS': self._read_from_PARAS } self.receipt_add_entry('KPF0.__init__', self.__module__, f' ', 'PASS', comment=f'Create L0/2D object') def _read_from_KPF(self, hdul: fits.HDUList) -> None: ''' Parse the HDUL based on KPF standards Args: hdul (fits.HDUList): List of HDUs parsed with astropy. ''' for hdu in hdul: if isinstance(hdu, fits.ImageHDU) or isinstance(hdu, fits.CompImageHDU): if hdu.name not in self.extensions: self.create_extension(hdu.name, np.ndarray) setattr(self, hdu.name, hdu.data) elif isinstance(hdu, fits.BinTableHDU): if hdu.name not in self.extensions: self.create_extension(hdu.name, pd.DataFrame) table = Table(hdu.data).to_pandas() setattr(self, hdu.name, table) elif hdu.name != 'PRIMARY' and hdu.name != 'RECEIPT': warnings.warn("Unrecognized extension {} of type {}".format(hdu.name, type(hdu))) continue self.header[hdu.name] = hdu.header if self.level == 0: self.l0filename = self.filename self.l1filename = self.filename.replace('.fits', '_L1.fits') self.l2filename = self.filename.replace('.fits', '_L2.fits') if self.level == 1: self.l0filename = self.filename.replace('_L1.fits', '.fits') self.l1filename = self.filename self.l2filename = self.filename.replace('_L1.fits', '_L2.fits') if self.level == 2: self.l0filename = self.filename.replace('_L2.fits', '.fits') self.l1filename = self.filename.replace('_L2.fits', '_L1.fits') self.l2filename = self.filename def _read_from_NEID(self, hdul): ''' Parse the HDUL based on NEID standards Args: hdul (fits.HDUList): List of HDUs parsed with astropy. ''' for hdu in hdul: this_header = hdu.header # depending on the name of the HDU, store them with corresponding # keys primary HDU is named 'DATA' if hdu.name == 'PRIMARY': self.header['PRIMARY'] = this_header elif hdu.name == 'DATA': self.create_extension('DATA', np.array) self.data = hdu.data self.DATA = self.data elif hdu.name == 'VARIANCE': self.create_extension('VARIANCE', np.array) self.variance = hdu.data self.VARIANCE = self.variance elif isinstance(hdu, fits.ImageHDU): if hdu.name not in self.extensions.keys(): self.create_extension(hdu.name, np.array) try: setattr(self, hdu.name, hdu.data) except TypeError: # buffer too small for requested array setattr(self, hdu.name, np.array([])) elif isinstance(hdu, fits.BinTableHDU): if hdu.name not in self.extensions.keys(): self.create_extension(hdu.name, pd.DataFrame) table = Table(hdu.data).to_pandas() setattr(self, hdu.name, table) else: warnings.warn('Unrecognized NEID extension {} of type {}'.format(hdu.name, type(hdu))) continue # raise KeyError('Unrecognized NEID extension {} of type {}'.format(hdu.name, type(hdu))) self.header[hdu.name] = this_header def _read_from_PARAS(self, hdul: fits.HDUList, force: bool=True) -> None: ''' Parse the HDUL based on PARAS standards Args: hdul (fits.HDUList): List of HDUs parsed with astropy. ''' for hdu in hdul: if isinstance(hdu, fits.PrimaryHDU): # PARAS data is stored in primary only self.header['DATA'] = hdu.header self.data = hdu.data self.variance = np.zeros_like(self.data) else: raise NameError('cannot recognize HDU {}'.format(hdu.name))
[docs] def info(self): ''' Pretty print information about this data to stdout ''' total_ram_bytes = 0 # <-- New: track total RAM usage if self.filename is not None: print('File name: {}'.format(self.filename)) try: filepath = os.path.join(self.dirname, self.filename) size_bytes = os.path.getsize(filepath) size_mb = size_bytes / (1024 * 1024) print('File size (on disk): {:.1f} MB'.format(size_mb)) except OSError: print('File size: [Could not access file]') else: print('Empty {:s} Data product'.format(self.__class__.__name__)) # a typical command window is 80 in length head_key = '|{:20s} |{:20s} \n{:40}'.format( 'Header Name', '# Cards', '='*80 + '\n' ) for key, value in self.header.items(): row = '|{:20s} |{:20} \n'.format(key, len(value)) head_key += row print(head_key) head = '|{:20s} |{:20s} |{:20s} \n{:40}'.format( 'Extension Name', 'Data Type', 'Data Dimension', '='*80 + '\n' ) for name in self.extensions.keys(): if name == 'PRIMARY': continue ext = getattr(self, name) if isinstance(ext, (np.ndarray, np.generic)): total_ram_bytes += ext.nbytes # <-- New: add array memory row = '|{:20s} |{:20s} |{:20s}\n'.format(name, 'image', str(ext.shape)) head += row elif isinstance(ext, pd.DataFrame): total_ram_bytes += ext.memory_usage(deep=True).sum() # <-- New: add table memory row = '|{:20s} |{:20s} |{:20s}\n'.format(name, 'table', str(len(ext))) head += row print(head) # Print total RAM usage estimate ram_mb = total_ram_bytes / (1024 * 1024) print('Estimated RAM usage: {:.1f} MB'.format(ram_mb))
def _create_hdul(self, compressed=False): ''' Create an hdul in FITS format. This is used by the base model for writing data context to file ''' hdu_list = [] hdu_definitions = self.extensions.items() for key, value in hdu_definitions: if value == fits.PrimaryHDU: head = self.header[key] hdu = fits.PrimaryHDU(header=head) elif value == fits.ImageHDU or value == fits.CompImageHDU: data = getattr(self, key) if data is None or data.size == 0 or data.ndim < 2 or any(dim == 0 for dim in data.shape): ndim = 0 hdu_type = fits.ImageHDU # data = np.array([]) else: ndim = len(data.shape) hdu_type = value if not compressed: hdu_type = fits.ImageHDU if hdu_type == fits.CompImageHDU: kwargs = {'compression_type': KPF_definitions.L0_COMPRESSION_TYPE} self.header[key]['ZIMAGE'] = 'T' self.header[key]['ZCMPTYPE'] = KPF_definitions.L0_COMPRESSION_TYPE self.header[key]['BSCALE'] = 1.0 self.header[key]['BZERO'] = 0.0 else: kwargs = {} self.header[key]['NAXIS'] = ndim if ndim == 0: self.header[key]['NAXIS1'] = 0 else: for d in range(ndim): self.header[key]['NAXIS{}'.format(d+1)] = data.shape[d] head = self.header[key] try: hdu = hdu_type(data=data, header=head, **kwargs) except KeyError as ke: print("KeyError exception raised: -->ke=" + str(ke)) print("Attempting to handle it...") if str(ke) == '\'bool\'': data = data.astype(float) print("------>SHAPE=" + str(data.shape)) hdu = hdu_type(data=data, header=head, **kwargs) else: raise KeyError("A different error...") elif value == fits.BinTableHDU: table = Table.from_pandas(getattr(self, key)) self.header[key]['NAXIS1'] = len(table) head = self.header[key] hdu = fits.BinTableHDU(data=table, header=head) else: print("Can't translate {} into a valid FITS format."\ .format(type(getattr(self, key)))) continue hdu.name = key if hdu.name == 'PRIMARY': hdu_list.insert(0, hdu) else: hdu_list.append(hdu) return hdu_list
if __name__ == "__main__": pass