Source code for kpfpipe.models.base_model

"""
Data models for KPF data
"""
# Standard dependencies
import os
import sys
import copy 
import warnings
import time
from collections import OrderedDict

# External dependencies
import astropy
from astropy.io import fits
from astropy.io.fits import verify
from astropy.io.fits.hdu.image import PrimaryHDU
from astropy.time import Time
from astropy.table import Table
import numpy as np
import pandas as pd
import git
import datetime
import hashlib

# Pipeline dependencies
from kpfpipe.tools.git_tools import *
from kpfpipe.models.metadata.receipt_columns import *
from kpfpipe.models.metadata.config_columns import *
from kpfpipe.models.metadata.KPF_definitions import FITS_TYPE_MAP

[docs] class KPFDataModel(object): '''The base class for all KPF data models. Warning: This class (KPFDataModel) should not be used directly. Based on the data level of your .fits file, used the appropriate level specific data model. This is the base model for all KPF data models. Level specific data inherit from this class, so any attribute and method listed here applies to all data models. Attributes: header (dict): a dictionary of headers of each extension (HDU) Header stores all header information from the FITS file. Since Each file is organized into extensions (HDUs), and Astropy parses each extension's header cards into a dictionary, this attribute is structured as a dictionary of Astropy header objects. The first layer is the name of the header, and the second layer is the name of the key. Note: For KPF, FITS extensions are identified by their name Examples >>> from kpfpipe.models.level0 import KPF0 # Assume we have an NEID level 0 file called "level0.fits" >>> level0 = KPF0.from_fits('level0.fits', 'NEID') # Accessing key 'OBS_TIME' from the 'PRIMARY' HDU >>> obs_time = level0.header['PRIMARY']['OBS_TIME'] receipt (pandas.DataFrame): a table that records the history of this data The receipt keeps track of the data process history, so that the information stored by this instance can be reproduced from the original data. It is structured as a pandas.DataFrame table, with each row as an entry Primitives that modifies the content of a data product are expected to also write to the receipt. Three string inputs from the primitive are required: name, any relevant parameters, and a status. The receipt will also automatically fill in additional information, such as the time of execution, code release version, current branch, ect. Note: It is not recommended to modify the Dataframe directly. Use the provided methods to make any adjustments. Examples: >>> from kpfpipe.models.level0 import KPF0 >>> data = KPF0() # Add an entry into the receipt # Three args are required: name_of_primitive, param, status >>> data.receipt_add_entry('primitive1', 'param1', 'PASS') >>> data.receipt Time ... Module_Param Status 0 2020-06-22T15:42:18.360409 ... input1 PASS extensions (dict): a dictionary of extensions. This attribute stores any additional information that any primitive may wish to record to FITS. Creating an extension creates an empty extension of the given type and one may modify it directly. Creating an extension will also create a new key-value pair in header, so that one can write header keywords to the extension. When writing to FITS extensions are stored in the FITS data type as specified in kpfpipe.models.metadata.KPF_definitions.FITS_TYPE_MAP (image or binary table). Whitespace or any symbols that may be interpreted by Python as an operator (e.g. -) are not allowed in extension names. Examples: >>> from kpfpipe.models.level0 import KPF0 >>> data = KPF0() # Add an extension # A unique name is required >>> data.create_extension('extension1', pd.DataFrame) # Access the extension by using its name as an attribute # Add a column called 'col1' to the Dataframe >>> data.extension1['col1'] = [1, 2, 3] >>> data.extension1['extension1'] col1 0 1 1 2 2 3 # add a key-value pair to the header >>> data.header['extension1']['key'] = 'value' # delete the extension we just made >>> data.del_extension['extension1'] config (DataFrame): two-column dataframe that stores each line of the input configuration file ''' def __init__(self): ''' Constructor ''' self.filename: str = None self.header = OrderedDict() self.header['PRIMARY'] = fits.Header() self.header['RECEIPT'] = fits.Header() self.header['CONFIG'] = fits.Header() self.receipt = pd.DataFrame([], columns=RECEIPT_COL).astype(str) self.RECEIPT = self.receipt self.config = pd.DataFrame([], columns=CONFIG_COL) self.CONFIG = self.config self.primary = OrderedDict() self.PRIMARY = self.primary self.extensions = OrderedDict(PRIMARY=fits.PrimaryHDU, RECEIPT=fits.BinTableHDU, CONFIG=fits.BinTableHDU) # level of data model self.level = None # set in each derived class self.read_methods = dict() def __getitem__(self, key): return getattr(self, key.upper()) def __setitem__(self, key, value): if key.upper() in self.extensions: setattr(self, key.upper(), value) else: data_type = type(value) self.create_extension(key.upper(), data_type) setattr(self, key.upper(), value) def __delitem__(self, key): self.del_extension(key.upper()) # ============================================================================= # I/O related methods
[docs] @classmethod def from_fits(cls, fn, data_type='KPF'): """Create a data instance from a file This method emplys the ``read`` method for reading the file. Refer to it for more detail. Args: fn (str): file path (relative to the repository) data_type (str): (optional) instrument type of the file [default='KPF'] Returns: cls (data model class): the data instance containing the file content """ this_data = cls() if not os.path.isfile(fn): raise IOError(f'{fn} does not exist.') # this_data.to_fits(fn) # populate it with self.read() this_data.read(fn, data_type=data_type) # Return this instance return this_data
[docs] def read(self, fn, data_type, overwrite=False): """Read the content of a .fits file and populate this data structure. Args: fn (str): file path (relative to the repository) data_type (str): instrument type of the file overwrite (bool): if this instance is not empty, specifies whether to overwrite Raises: IOError: when a invalid file is presented Note: This is not a @classmethod so initialization is required before calling this function """ if not fn.endswith('.fits'): # Can only read .fits files raise IOError('input files must be FITS files') if not overwrite and self.filename is not None: # This instance already contains data, and # we don't want to overwrite raise IOError('Cannot overwrite existing data') self.filename = os.path.basename(fn) self.dirname = os.path.dirname(fn) with fits.open(fn) as hdu_list: # Handles the Receipt and the auxilary HDUs for hdu in hdu_list: if isinstance(hdu, fits.PrimaryHDU): self.header[hdu.name] = hdu.header elif isinstance(hdu, fits.BinTableHDU) and not isinstance(hdu, fits.CompImageHDU): t = Table.read(hdu) if 'RECEIPT' in hdu.name: # Table contains the RECEIPT df = t.to_pandas() df = df.reindex(df.columns.union(RECEIPT_COL, sort=False), axis=1, fill_value='') setattr(self, hdu.name, df) setattr(self, hdu.name.lower(), getattr(self, hdu.name)) self.header[hdu.name] = hdu.header setattr(self, hdu.name, t.to_pandas()) # Leave the rest of HDUs to level specific readers if data_type in self.read_methods.keys(): self.read_methods[data_type](hdu_list) else: # the provided data_type is not recognized, ie. # not in the self.read_methods list raise IOError('cannot recognize data type {}'.format(data_type)) # compute MD5 sum of source file and write it into a receipt entry for tracking. # Note that MD5 sum has known security vulnerabilities, but we are only using # this to ensure data integrity, and there is no known reason for someone to try # to hack astronomical data files. If something more secure is is needed, # substitute hashlib.sha256 for hashlib.md5 md5 = hashlib.md5() self.receipt_add_entry('from_fits', self.__module__, f'fn={fn}', 'PASS', comment=f'md5_sum={md5.hexdigest()}') with open(fn, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): md5.update(chunk)
[docs] def to_fits(self, fn, compressed=False): """ Collect the content of this instance into a monolithic FITS file Args: fn (str): file path compressed (bool): if True, compress the file using the compression type specified in KPF_definitions.py [default=True] Note: Can only write to KPF formatted FITS """ if not fn.endswith('.fits'): # we only want to write to a '.fits file raise NameError('filename must end with .fits') gen_hdul = getattr(self, '_create_hdul', None) if gen_hdul is None: raise TypeError('Write method not found. Is this the base class?') else: hdu_list = gen_hdul(compressed=compressed) # check that no card in any HDU is greater than 80 # this is a hard limit by FITS for hdu in hdu_list: if 'OBS FILE' in hdu.header.keys(): del hdu.header['OBS FILE'] elif 'PRIMARY' in hdu.header.keys(): del hdu.header['PRIMARY'] # Add receipt self.receipt_add_entry('to_fits', self.__module__, f'fn={fn}', 'PASS') # finish up writing hdul = fits.HDUList(hdu_list) if not os.path.isdir(os.path.dirname(fn)): os.makedirs(os.path.dirname(fn), exist_ok=True) hdul.writeto(fn, overwrite=True, output_verify='silentfix') hdul.close()
# ============================================================================= # Receipt related members
[docs] def receipt_add_entry(self, module, mod_path, param, status, chip='all', comment=' '): ''' Add an entry to the receipt Args: module (str): Name of the module making this entry param (str): param to be recorded status (str): status to be recorded chip (str): (optional) which ccd [default='all'] ''' # time of execution in ISO format time = datetime.datetime.now().isoformat() # get version control info (git) repo = git.Repo(search_parent_directories=True) try: git_commit_hash = repo.head.object.hexsha git_branch = repo.active_branch.name # Sort tags by commit date (latest tag last) tags = sorted(repo.tags, key=lambda t: t.commit.committed_datetime) git_tag = str(tags[-1]) if tags else '' except TypeError: # expected if running in testing env git_commit_hash = '' git_branch = '' git_tag = '' except ValueError: # 12/22/22 new behavior under Docker git_commit_hash = get_git_revision_hash() git_branch = get_git_branch() git_tag = get_git_tag() except BrokenPipeError: # 1/10/23 behavior under Docker uncovered by hour-long testing git_commit_hash = get_git_revision_hash() git_branch = get_git_branch() git_tag = get_git_tag() # add the row to the bottom of the table row = {'Time': time, 'Code_Release': git_tag, 'Branch_Name': git_branch, 'Module_Level': str(self.level), 'Module_Name': module, 'Module_Param': param, 'Module_Path': mod_path, 'Comment': str(comment), 'Chip': chip, 'Commit_Hash': git_commit_hash, 'Status': status} self.receipt = pd.concat([self.receipt, pd.DataFrame([row])], ignore_index=True) self.RECEIPT = self.receipt # add DRPTAG and DRPHASH to primary header self.header['PRIMARY']['DRPTAG'] = git_tag self.header['PRIMARY']['DRPHASH'] = git_commit_hash
[docs] def receipt_info(self, receipt_name): ''' Print the short version of the receipt Args: receipt_name (string): name of the receipt ''' rec = getattr(self, receipt_name) msg = rec['Time', 'Module_Name', 'Status'] print(msg)
# ============================================================================= # Auxiliary related extension
[docs] def create_extension(self, ext_name, ext_type=pd.DataFrame): ''' Create a new empty extension to be saved to FITS. Will not overwrite an existing extensions Args: ext_name (str): extension name ext_type (object): Python object type for this extension. Must be present in kpfpipe.models.metadata.FITS_TYPE_MAP.keys(). ''' if ext_type not in FITS_TYPE_MAP.values(): if ext_type == np.ndarray: ext_type = np.array else: raise TypeError("Unknown extension type {}. Available extension types: {}".format(ext_type, FITS_TYPE_MAP.values())) reverse_map = OrderedDict(zip(FITS_TYPE_MAP.values(), FITS_TYPE_MAP.keys())) # check whether the extension already exist if ext_name in self.extensions.keys() and ext_name in self.__dir__(): raise NameError('Name {} already exists as extension'.format(ext_name)) setattr(self, ext_name, None) self.header[ext_name] = fits.Header() self.extensions[ext_name] = reverse_map[ext_type]
[docs] def del_extension(self, ext_name): ''' Delete an existing auxiliary extension Args: ext_name (str): extension name ''' base = KPFDataModel() core_extensions = base.header.keys() if ext_name in core_extensions: raise KeyError('Can not remove any of the core extensions: {}'.format(core_extensions)) elif ext_name not in self.extensions.keys(): return # raise KeyError('Extension {} could not be found'.format(ext_name)) delattr(self, ext_name) del self.header[ext_name] del self.extensions[ext_name]
if __name__ == '__main__': pass