Source code for modules.ca_hk.src.ca_hk_extraction

# Standard dependencies 
"""
    This module defines class `CaHKExtraction` which inherits from `KPF0_Primitive` and provides methods to perform the
    event on CA H&K extraction in the recipe.

    Description:
        * Method `__init__`:

            CaHKExtraction constructor, the following arguments are passed to `__init__`,

                - `action (keckdrpframework.models.action.Action)`: `action.args` contains positional arguments and
                  keyword arguments passed by the `OrderTrace` event issued in the recipe:
                    - `action.args[0] (kpfpipe.models.level0.KPF0|str)`: Instance of `KPF0` or the path of a fits file
                      containing image data for H&K extraction.
                    - `action.args[1] (str)`: Path to a file defining the fiber and order location
                    - `action.args[2] (list)`: List containing the fiber names.
                    - `action.args[3] (kpfpipe.models.level1.KPF1)`:  Instance of `KPF1` containing spectral
                      extraction results. If not existing, it is None.
                    - `action.args['output_exts'] (str)`: Extension names of the extensions to contain
                      the extraction result for each fiber, optional, Defaults to fiber list.
                    - `action.args['output_wave_exts'] (str)`: Extension names of the extensions to contain
                      the wavelength solution for each fiber, optional, Defaults to fiber list prefixed with '_wave'.
                    - `action.args['dark'] (KPF0, optional)`: dark master file for dark subtraction. Defaults to None.
                    - `action.args['bias'] (KPF0, optional)`: bias master file for bias subtraction. Defaults to None.
                    - `action.args['wave_files'] (list, optional)`: Wavelength solution files per fiber list.
                      Defaults to None.
                - `context (keckdrpframework.models.processing_context.ProcessingContext)`: `context.config_path`
                  contains the path of the config file defined for the module of hk_spectral_extraction  in the master
                  config file associated with the recipe.

            and the following attributes are defined to initialize the object,
                - `input_img (kpfpipe.models.level0.KPF0)`: Instance of `KPF0`,  assigned by `actions.args[0]`.
                - `fibers (list)`: The interested fibers to be processed, assigned by `actions.args[1]`.
                - `fiber_loc (dict)`: An dictionary instance contains the order location for each fiber.
                - `output_exts (list)`: output extension names.

        * Method `__perform`:

            CaHKExtraction returns the result in `Arguments` object which contains a level 1 data object (`KPF1`)
            with the extraction results.
    Usage:
        For the recipe, the order trace event is issued like::

            :
            lev0_data = kpf0_from_fits(input_lev0_file)
            output_data = kpf1_from_fits(output_lev1_file, data_type = data_type)
            :
            output_data = CaHKExtraction(lev0_data,
                    hk_trace_table,                     # ex. /data/masters/kpfMaster_HKOrderBounds20220909.csv
                    hk_fiber_list,                      # ex. ['sci', 'sky']
                    output_data,                        # lev1 containing Ca H&K extraction
                    output_exts=hk_spec_ext,            # ex. ['CA_HK_SCI', 'CA_HK_SKY']
                    dark=hk_dark_data,                  # lev0 containing dark image
                    wave_files=hk_wavelength_tables)    # ex. ['/data/masters/kpfMaster_HKwave20220909_sci.csv',
                                                        #      '/data/masters/kpfMaster_HKwave20220909_sky.csv']
            :

        where `output_data` is KPF1 object wrapped in `Arguments` class object.
"""

import configparser

# Pipeline dependencies
from kpfpipe.primitives.level0 import KPF0_Primitive
from kpfpipe.models.level0 import KPF0
from kpfpipe.models.level1 import KPF1

# External dependencies
from keckdrpframework.models.action import Action
from keckdrpframework.models.arguments import Arguments
from keckdrpframework.models.processing_context import ProcessingContext

# Local dependencies
from modules.ca_hk.src.alg import CaHKAlg
import numpy as np
from os.path import exists
from astropy.io import fits

# Global read-only variables
DEFAULT_CFG_PATH = 'modules/ca_hk/configs/default_hk.cfg'
CAHK_EXT = 'CA_HK'


[docs] class CaHKExtraction(KPF0_Primitive): def __init__(self, action: Action, context: ProcessingContext) -> None: # Initialize parent class KPF0_Primitive.__init__(self, action, context) args_keys = [item for item in action.args.iter_kw() if item != "name"] # CA_HK data from level 0 data if isinstance(action.args[0], str): img = KPF0.from_fits(action.args[0]) elif isinstance(action.args[0], KPF0): img = action.args[0] else: img = None self.input_img = img[CAHK_EXT] if hasattr(img, CAHK_EXT) else None # trace path self.trace_path = action.args[1] # fiber list if action.args[2] is not None and isinstance(action.args[2], list): self.fibers = action.args[2] elif action.args[2] is not None and isinstance(action.args[2], str): self.fibers = [action.args[2]] else: self.fibers = [] # level 1 data instance for output, existing or not self.output_level1 = action.args[3] if "dark" in args_keys: if isinstance(action.args['dark'], str): img = KPF0.from_fits(action.args['dark']) elif isinstance(action.args['dark'], KPF0): img = action.args['dark'] else: img = None else: img = None self.dark_img = img[CAHK_EXT] if hasattr(img, CAHK_EXT) else None if "bias" in args_keys: if isinstance(action.args['bias'], str): img = KPF0.from_fits(action.args['bias']) elif isinstance(action.args['bias'], KPF0): img = action.args['gias'] else: img = None else: img = None self.bias_img = img[CAHK_EXT] if hasattr(img, CAHK_EXT) else None self.wave_table_files = [] if 'wave_files' in args_keys: if isinstance(action.args['wave_files'], list): self.wave_table_files.extend(action.args['wave_files']) else: self.wave_table_files.append(action.args['wave_files']) # input configuration self.config = configparser.ConfigParser() try: self.config_path = context.config_path['hk_spectral_extraction'] except: self.config_path = DEFAULT_CFG_PATH self.config.read(self.config_path) self.output_exts = [] if 'output_exts' in args_keys: if isinstance(action.args['output_exts'], list): self.output_exts.extend(action.args['output_exts']) elif isinstance(action.args['output_exts'], str): self.output_exts.append(action.args['output_exts']) self.output_wave_exts = [] if 'output_wave_exts' in args_keys: if isinstance(action.args['output_wave_exts'], list): self.output_wave_exts.extend(action.args['output_wave_exts']) elif isinstance(action.args['output_wave_exts'], str): self.output_wave_exts.append(action.args['output_wave_exts']) # start a logger self.logger = None # self.logger = start_logger(self.__class__.__name__, self.config_path) if not self.logger: self.logger = self.context.logger self.logger.info('Loading config from: {}'.format(self.config_path)) # Order trace algorithm setup try: # no trace_path, no ca_hk data, dark image and bias image size not matches that of ca_hk data if (self.trace_path is None) or not exists(self.trace_path) or \ (self.input_img is not None and self.input_img.size == 0) or \ ((self.dark_img is not None) and (self.input_img is not None) and (np.shape(self.dark_img) != np.shape(self.input_img))) or \ ((self.bias_img is not None) and (self.input_img is not None) and (np.shape(self.bias_img) != np.shape(self.input_img))): self.alg = None else: self.alg = CaHKAlg(self.input_img, self.fibers, output_exts = self.output_exts, output_wl_exts = self.output_wave_exts, config=self.config, logger=self.logger) except KeyError: self.alg = None def _pre_condition(self) -> bool: """ Check for some necessary pre conditions """ # input argument must be KPF0 return True def _post_condition(self) -> bool: """ check for some necessary post condition """ return True def _perform(self): """ Primitive action - perform spectral extraction from Ca H&K image based on the trace location information Returns: KPF1 instance """ if self.alg is None: self.logger.warning("CaHKExtraction: no CA_HK data or trace data or wrong dark/bg size") return Arguments(None) # load trace location data self.alg.load_trace_location(self.trace_path) fibers = self.alg.get_fibers() self.output_exts = self.alg.get_output_exts() self.output_wave_exts = self.alg.get_wavelength_exts() result, msg = self.alg.img_subtraction(self.dark_img, self.bias_img) if not result and self.logger: self.logger.warning("CaHKExtraction: dark/bias subtraction error: "+msg) return Arguments(None) self.alg.img_scaling() for idx, fiber in enumerate(fibers): df_ext_result = self.alg.extract_spectrum(fiber) data_df = df_ext_result['spectral_extraction_result'] assert data_df is not None, df_ext_result['message'] self.output_level1 = self.construct_level1_data(data_df, self.output_exts[idx], self.output_level1) if len(self.wave_table_files) > idx: self.build_wavelength_ext(self.wave_table_files[idx], fiber, self.output_wave_exts[idx], self.output_level1) self.output_level1.receipt_add_entry('CaHkExtraction', self.__module__, f'CA_HK extraction to extensions {" ".join(self.output_exts)}', 'PASS') if self.logger: self.logger.warning("CaHkExtraction: Receipt written") if self.logger: self.logger.warning("CaHkExtraction: Done!") return Arguments(self.output_level1) @staticmethod def construct_level1_data(ext_result, ext_name, output_level1): if output_level1 is not None: kpf1_obj = output_level1 else: kpf1_obj = KPF1() if ext_result is not None: total_order, width = np.shape(ext_result.values) else: total_order = 0 # if no data in ext_result, not build data extension and the associated header if total_order > 0: data_ext_name = ext_name.upper() # data = op_result.values kpf1_obj[data_ext_name] = ext_result.values for att in ext_result.attrs: kpf1_obj.header[data_ext_name][att] = ext_result.attrs[att] return kpf1_obj def build_wavelength_ext(self, wave_file, fiber, wave_ext, out_lev1): wave_table = self.alg.load_wavelength_table(wave_file, fiber) if wave_table is not None: out_lev1[wave_ext] = wave_table if self.logger: self.logger.info("CaHkExtraction: write wls to "+wave_ext)