Source code for modules.order_trace.src.order_mask

# Standard dependencies
"""
    This module defines class OrderMask which inherits from `KPF0_Primitive` and provides methods to perform
    the event to produce order mask data in the recipe.

    Description:
        * Method `__init__`:

            OrderMask constructor, the following arguments are passed to `__init__`,

                - `action (keckdrpframework.models.action.Action)`: `action.args` contains positional arguments and
                  keyword arguments passed by the `OrderRectification` event issued in the recipe:

                    - `action.args[0] (kpfpipe.models.level0.KPF0)`: Instance of `KPF0` containing level 0 data for
                      making order mask based on the order trace result.
                    - `action.args[1] (kpfpipe.models.level0.KPF0)`: Instance of `KPF0` to contain order mask result.
                    - `action.args['data_extension']: (str, optional)`: the name of the extension containing data.
                    - `action.args['trace_file']: (str, optional)`: the path of the file containing order trace result.
                    - `action.args['orderlet_names'] (str|list, optional)`: Name or list of names of all orderlets
                      included in the order trace result. Defaults to 'SCI1'.
                    - `action.args['orderlets_on_image'] (str|list, optional)`: Name or list of names of the orderlets
                      appear on the L0 image. Defaults to None.
                    - `action.args['start_order'] (int, optional)`: Index of the first orderlet of the first order
                      against the first trace included in the order trace data indexed as 0. Defaults to 0.
                    - `action.args['poly_degree']: (str, optional)`: Polynomial degree for order trace curve fitting.
                      Defaults to 3.
                    - `action.args['origin']: (list, optional)`: Origin of the image where the order trace is related
                      to. Defaults to [0, 0]
                    - `action.args['orderlet_widths']: (dict, optional)`: Orderlet widths to replace the edge widths
                      from the order trace file. Defaults to {} or None.
                    - `action.args['orderlet_values']: (dict, optional)`: The values to be assigned to the pixels of
                      each orderlet  of the output, ex. [1, 2, 3, 4, 5] assigns 5 numbers to the pixels of 5 orderlets
                      respectively. [3] assigns all orderlet pixels the same number, i.e. 3. Defaults to 1 for all
                      trace pixels.
                    - `action.args['non_orderlet_value']: (number, optional)`: Value for non orderlet pixels.
                      Defaults to 0.
                    - `action.args['full_coverage']: (number, optional)`: Make order mask to cover entire image width
                      regardless of the horizontal coverage provided by order trace result.  Defaults to 0.

                - `context (keckdrpframework.models.processing_context.ProcessingContext)`: `context.config_path`
                  contains the path of the config file defined for the module of order trace in the master
                  config file associated with the recipe.

            and the following attributes are defined to initialize the object,

                - `img_data (numpy.ndarray)`: raw flux data for order trace mask.
                - `output_level0 (kpfpipe.models.level0.KPF0)`: Instance of `KPF0` to contain order mask result.
                - `orderlet_names (list)`: list of orderlet names per order.
                - `data_ext (str)`: extension of flux data to be processed.
                - `orderlets_on_image (list)`: list of names of the orderlets appearing on the L0 image.
                - `order_trace_data (pandas.DataFrame): order trace data including polynomial coefficients,
                  top/bottom edges and horizontal coverage for each order trace.
                - `config_path (str)`: Path of config file for spectral extraction.
                - `config (configparser.ConfigParser)`: Config context per the file defined by `config_path`.
                - `logger (logging.Logger)`: Instance of logging.Logger.
                - `alg (modules.order_trace.src.alg.OrderMaskAlg)`: Instance of `OrderMaskAlg` which has operation
                  codes for the computation of order mask.

        * Method `__perform`:

            OrderRectification returns the result in `Arguments` object which contains a level 0 data object (`KPF0`)
            with the rectification results replacing the raw image.

    Usage:
        For the recipe, the spectral extraction event is issued like::

            :
            order_names=['GREEN_SKY_FLUX', 'GREEN_SCI_FLUX1', 'GREEN_SCI_FLUX2', 'GREEN_SCI_FLUX3', 'GREEN_CAL_FLUX']
            lev0_data = kpf0_from_fits(input_lev0_flat, data_type=data_type)
            op_data = OrderMask(lev0_data, NULL,
                                orderlet_names=order_names,
                                start_order=-1,
                                trace_file='/data/masters/20230411/kpf_20230411_master_flat_GREEN_CCD.csv',
                                data_extension='GREEN_CCD')
            :
        where `op_data` is level 0 data (`KPF0`) object Wrapped in `Arguments` class object.
"""


import configparser
import pandas as pd
import numpy as np
import os
import numbers
import copy

# Pipeline dependencies
from kpfpipe.primitives.level0 import KPF0_Primitive
from kpfpipe.models.level0 import KPF0

# External dependencies
from keckdrpframework.models.action import Action
from keckdrpframework.models.arguments import Arguments
from keckdrpframework.models.processing_context import ProcessingContext

# Local dependencies
from modules.order_trace.src.alg_order_mask import OrderMaskAlg

# Global read-only variables
DEFAULT_CFG_PATH = 'modules/spectral_extraction/configs/default.cfg'

[docs] class OrderMask(KPF0_Primitive): default_args_val = { 'start_order': 0, 'data_extension': 'DATA', 'trace_file': None, 'poly_degree': 3, 'origin': [0, 0], 'orderlets_on_image': None, 'orderlet_widths': None, 'full_coverage': 0 } NORMAL = 0 VERTICAL = 1 NoRECT = 2 def __init__(self, action: Action, context: ProcessingContext) -> None: # Initialize parent class KPF0_Primitive.__init__(self, action, context) args_keys = [item for item in action.args.iter_kw() if item != "name"] # input argument # action.args[0] is for level 0 fits # action.args[1] is for level 0 flat with order trace result extension input_flux = action.args[0] # kpf0 instance for L0 data self.input_flux = input_flux self.output_level0 = action.args[1] # kpf0 output containing order mask o_names = self.get_args_value('orderlet_names', action.args, args_keys) if o_names is not None and o_names: self.orderlet_names = o_names if isinstance(o_names, list) else [o_names] else: self.orderlet_names = None start_order = self.get_args_value("start_order", action.args, args_keys) # the index of first orderlet self.data_ext = self.get_args_value('data_extension', action.args, args_keys) order_trace_file = self.get_args_value('trace_file', action.args, args_keys) self.orderlets_on_image = action.args["orderlets_on_image"] if "orderlets_on_image" in args_keys \ else self.orderlet_names od_widths = action.args['orderlet_widths'] if 'orderlet_widths' in args_keys else None if od_widths is not None and isinstance(od_widths, list) and od_widths: self.orderlet_widths = od_widths else: self.orderlet_widths = None od_values = action.args['orderlet_values'] if 'orderlet_values' in args_keys else None if od_values is not None and isinstance(od_values, list) and od_values: self.orderlet_values = od_values elif self.orderlet_names is not None: od_v = 1 if od_values is None or isinstance(od_values, list) else od_values self.orderlet_values = [od_v for i in range(len(self.orderlet_names))] else: self.orderlet_values = None if self.orderlet_names is not None and (len(self.orderlet_names) > len(self.orderlet_values)): for idx in range(len(self.orderlet_names) - len(self.orderlet_values)): self.orderlet_values.append(self.orderlet_values[-1]) non_od_value = action.args['non_orderlet_value'] if 'non_orderlet_value' in args_keys else None self.full_coverage = self.get_args_value("full_coverage", action.args, args_keys) # input configuration self.config = configparser.ConfigParser() try: self.config_path = context.config_path['order_trace'] except: self.config_path = DEFAULT_CFG_PATH self.config.read(self.config_path) # start a logger self.logger = None if not self.logger: self.logger = self.context.logger self.logger.info('Loading config from: {}'.format(self.config_path)) # Order trace algorithm setup self.img_data = input_flux[self.data_ext] if input_flux is not None and hasattr(input_flux, self.data_ext) \ else None self.order_trace_data = None order_trace_header = None if order_trace_file and os.path.isfile(order_trace_file): self.order_trace_data = pd.read_csv(order_trace_file, header=0, index_col=0) poly_degree = self.get_args_value('poly_degree', action.args, args_keys) origin = self.get_args_value('origin', action.args, args_keys) order_trace_header = {'STARTCOL': origin[0], 'STARTROW': origin[1], 'POLY_DEG': poly_degree} try: if self.orderlets_on_image is None: self.alg = None else: self.alg = OrderMaskAlg(self.img_data, self.order_trace_data, order_trace_header, order_mask_data=non_od_value, orderlet_names=self.orderlet_names, start_order=start_order, full_coverage = self.full_coverage, config=self.config, logger=self.logger) except Exception as e: self.alg = None def _pre_condition(self) -> bool: """ Check for some necessary pre conditions """ return True def _post_condition(self) -> bool: """ Check for some necessary post conditions """ return True def _perform(self): """ Primitive action - perform order mask by calling method ``create_order_mask from OrderMaskAlg and create an array of of data in which the pixels covering the traces are masked. Returns: 2D array containing order mask data """ if self.logger: self.logger.info("OrderMask: creating order mask...") if self.alg is None: if self.logger: self.logger.info("OrderMask: no flux data or order trace data or no orderlet on image for "+self.data_ext) return Arguments(None) order_mask_result = None total_orderlet = len(self.orderlet_names) def is_eligible_width_number(w_num): return (isinstance(w_num, numbers.Number) or w_num.isnumeric()) and float(w_num) >= 0.0 if self.orderlet_widths is not None and len(self.orderlet_names) != len(self.orderlet_widths): if self.logger: self.logger.info("OrderMask: size of orderlet width not match the size of orderlet names") self.orderlet_widths = None for idx in range(total_orderlet): order_name = self.orderlet_names[idx] o_width = self.orderlet_widths[idx] if self.orderlet_widths is not None else None if o_width is not None: if not isinstance(o_width, list): # list o_width = [o_width, o_width] if len(o_width) < 1: # empty list o_width = None elif len(o_width) == 1: o_width = [o_width[0], o_width[0]] else: o_width = [o_width[0], o_width[1]] if o_width is not None: o_width = [float(o_width[0]), float(o_width[1])] \ if all([is_eligible_width_number(o_width[i]) for i in range(2)]) else None o_name = order_name if order_name in self.orderlets_on_image else None order_mask_result = self.alg.get_order_mask(o_name, s_width=o_width, trace_value=self.orderlet_values[idx]) assert ('order_mask_result' in order_mask_result and isinstance(order_mask_result['order_mask_result'], np.ndarray)) if self.output_level0 is None: self.output_level0 = copy.deepcopy(self.input_flux) self.output_level0[self.data_ext] = order_mask_result['order_mask_result'] self.output_level0.header['PRIMARY']['IMTYPE'] = 'OrderMask' self.output_level0.receipt_add_entry('OrderMask', self.__module__, f'order mask is created for {self.data_ext}', 'PASS') if self.logger: self.logger.info("OrderMask: Receipt written") return Arguments(self.output_level0) def get_args_value(self, key: str, args: Arguments, args_keys: list): if key in args_keys: v = args[key] else: v = self.default_args_val[key] return v