Source code for modules.order_trace.src.order_trace

# Standard dependencies 
"""
    This module defines class `OrderTrace` which inherits from `KPF0_Primitive` and provides methods to perform the
    event on order trace calculation in the recipe.

    Description:
        * Method `__init__`:

            OrderTrace constructor, the following arguments are passed to `__init__`,

                - `action (keckdrpframework.models.action.Action)`: `action.args` contains positional arguments and
                  keyword arguments passed by the `OrderTrace` event issued in the recipe:

                    - `action.args[0] (kpfpipe.models.level0.KPF0)`: Instance of `KPF0` containing image data for order
                      trace extraction.
                    - `action.args['data_row_range'] (list, optional)`: Row coverage of the level 0 data to be
                      processed. If the number is less than 0, it stands for the position relative to the last row
                      of the image.
                    - `action.args['data_col_range'] (list, optional)`: Column coverage of the level 0 data to be
                      processed. If the number is less than 0, it stands for the position relative to the last column
                      of the image.
                    - `action.args['data_extension'] (string, optional)`: name of the extension with the image. Defaults
                      to "data".
                    - `action.args['result_path'] (string, optional)`: name of the file path or data model extension
                      containing the order trace result. Defaults to None.
                    - `action.args['fitting_poly_degree'] (int, optional)`: Order of polynomial used to fit the trace.
                      Defaults to None. This value overrides the number defined in the configuration file for the module
                      if it is defined.
                    - `action.args['is_output_file'] (boolean, optional)`: if the result is output to a file or data
                      model extension. Defaults to True meaning output to a file.
                    - `action.args['orders_ccd'] (boolean, optional)`: total orders of the ccd. Defaults to -1.
                    - `action.args['do_post'] (boolean, optional)`: do post process only on existing order trace file.
                    - `action.args['orderlet_pixel_gaps'] (number, options)`: orderlet gap pixels between consecutive
                      orderlets, i.e. number of pixels to ignore between orderlets during extraction. Defaults to 2.
                    - `action.args['overwrite'] (bool, options)`: overwrite existing order trace file or not.
                      Defaults to False.

                - `context (keckdrpframework.models.processing_context.ProcessingContext)`: `context.config_path`
                  contains the path of the config file defined for the module of order trace  in the master
                  config file associated with the recipe.

            and the following attributes are defined to initialize the object,

                - `input (kpfpipe.models.level0.KPF0)`: Instance of `KPF0`,  assigned by `actions.args[0]`.
                - `flat_data (numpy.array)`:  2D spectral data associated with `actions.args[0]`.
                - `data_row_range (list)`: Row range of the data to be processed. The row is counted from
                  the first row in case the number is greater than or equal to 0, or from the last row in case
                  the number is less than 0.
                - `data_col_range (list)`: Column range of the data to be processed. The column is counted from
                  first column in case the number is greater than or equal to 0, or from the last column
                  in case the number is less than 0.
                - `config_path (str)`: Path of config file for the computation of order trace.
                - `config (configparser.ConfigParser)`: Config context.
                - `logger (logging.Logger)`: Instance of logging.Logger.
                - `result_path (str)`: File or extension containing the output.
                - `alg (modules.order_trace.src.alg.OrderTraceAlg)`: Instance of `OrderTraceAlg` which has operation
                  codes for the computation of order trace.
                - `poly_degree (int)`: Order of polynomial for order trace fitting.
                - `do_post (bool)`: if doing post processing on existing order trace data.
                - `orderlet_gap_pixels`: number of pixels to ignore between orderlets during extraction.
                - `overwrite (bool, options)`: overwrite existing order trace file or not. Defaults to False.


        * Method `__perform`:

            OrderTrace returns the result in `Arguments` object which contains the original input
            level 0 data object (`KPF0`) plus an extension with the order trace result.

    Usage:
        For the recipe, the order trace event is issued like::

            :
            flat_data = kpf0_from_fits(input_flat_file, data_type=data_type)
            order_result_data = OrderTrace(flat_data, data_extension='GREEN_CCD_STACK',
                                data_col_range=[0, -1], data_row_range=[0, -1],
                                result_path="/data/masters/20230411/kpf_20230411_master_flat_GREEN_CCD.csv",
                                fitting_poly_degree=3)
            :

        where `order_result_data` is Panda.Dataframe object wrapped in `Arguments` class object.

"""

import configparser

# Pipeline dependencies
from kpfpipe.primitives.level0 import KPF0_Primitive
from kpfpipe.models.level0 import KPF0

# External dependencies
from keckdrpframework.models.action import Action
from keckdrpframework.models.arguments import Arguments
from keckdrpframework.models.processing_context import ProcessingContext

# Local dependencies
from modules.order_trace.src.alg import OrderTraceAlg
import ast
import numpy as np
import pandas as pd
import os

# Global read-only variables
DEFAULT_CFG_PATH = 'modules/order_trace/configs/default.cfg'


[docs] class OrderTrace(KPF0_Primitive): def __init__(self, action: Action, context: ProcessingContext) -> None: # Initialize parent class KPF0_Primitive.__init__(self, action, context) args_keys = [item for item in action.args.iter_kw() if item != "name"] # input argument self.input = action.args[0] if 'data_extension' in args_keys and action.args['data_extension'] is not None: self.flat_data = self.input[action.args['data_extension']] else: self.flat_data = self.input.data row, col = np.shape(self.flat_data) self.row_range = [0, row-1] self.col_range = [0, col-1] self.cols_to_reset = None self.rows_to_reset = None self.result_path = None self.poly_degree = None self.expected_traces = None self.do_post = False self.orderlet_gap_pixels = 2 self.orders_ccd = -1 self.overwrite = False if 'data_row_range' in args_keys and action.args['data_row_range'] is not None: self.row_range = self.find_range(action.args['data_row_range'], row) if 'data_col_range' in args_keys and action.args['data_col_range'] is not None: self.col_range = self.find_range(action.args['data_col_range'], col) if 'cols_to_reset' in args_keys and action.args['cols_to_reset'] is not None: self.cols_to_reset = action.args['cols_to_reset'] if 'rows_to_reset' in args_keys and action.args['rows_to_reset'] is not None: self.rows_to_reset = action.args['rows_to_reset'] if 'result_path' in args_keys and action.args['result_path'] is not None: self.result_path = action.args['result_path'] self.is_output_file = 'is_output_file' not in args_keys or action.args['is_output_file'] if 'fitting_poly_degree' in args_keys and action.args['fitting_poly_degree'] is not None: self.poly_degree = action.args['fitting_poly_degree'] if 'expected_traces' in args_keys and action.args['expected_traces'] is not None: self.expected_traces = action.args['expected_traces'] if 'orders_ccd' in args_keys and action.args['orders_ccd'] is not None: self.orders_ccd = action.args['orders_ccd'] if 'do_post' in args_keys and action.args['do_post'] is not None: self.do_post = action.args['do_post'] if 'orderlet_gap_pixels' in args_keys and action.args['orderlet_gap_pixels'] is not None: self.orderlet_gap_pixels = action.args['orderlet_gap_pixels'] if 'overwrite' in args_keys and action.args['overwrite'] is not None: self.overwrite = action.args['overwrite'] # input configuration self.config = configparser.ConfigParser() try: self.config_path = context.config_path['order_trace'] except: self.config_path = DEFAULT_CFG_PATH self.config.read(self.config_path) # start a logger self.logger = None # self.logger = start_logger(self.__class__.__name__, self.config_path) if not self.logger: self.logger = self.context.logger self.logger.info('Loading config from: {}'.format(self.config_path)) # Order trace algorithm setup self.alg = OrderTraceAlg(self.flat_data, poly_degree=self.poly_degree, expected_traces=self.expected_traces, config=self.config, logger=self.logger, orders_ccd=self.orders_ccd, do_post=self.do_post) def _pre_condition(self) -> bool: """ Check for some necessary pre conditions """ # input argument must be KPF0 success = isinstance(self.input, KPF0) and \ isinstance(self.flat_data, np.ndarray) return success def _post_condition(self) -> bool: """ check for some necessary post condition """ return True def _perform(self): """ Primitive action - perform radial velocity computation by calling methods from OrderTraceAlg. Returns: DataFrame instance containing the order trace result. """ self.alg.set_data_range([self.col_range[0], self.col_range[1], self.row_range[0], self.row_range[1]]) # if order trace result file exists and do_post is True, then process the result only if self.result_path and os.path.isfile(self.result_path) and \ os.path.exists(self.result_path) and (not self.overwrite) and self.do_post: df = self.alg.refine_order_trace(self.result_path, self.is_output_file, orderlet_gap = self.orderlet_gap_pixels) self.input.receipt_add_entry('OrderTrace', self.__module__, f'config_path={self.config_path}', 'PASS') if self.logger: self.logger.info("OrderTrace: refine existing order trace result "+self.result_path) if self.logger: # self.logger.info("OrderTrace: Done!") self.logger.warning("OrderTrace: Refinement is done!") return Arguments(df) # 1) Locate cluster if self.logger: #self.logger.info("OrderTrace: locating cluster...") self.logger.warning("OrderTrace: locating cluster...") cluster_xy = self.alg.locate_clusters(self.rows_to_reset, self.cols_to_reset) # 2) assign cluster id and do basic cleaning if self.logger: #self.logger.info("OrderTrace: assigning cluster id and cleaning...") self.logger.warning("OrderTrace: assigning cluster id and cleaning...") x, y, index = self.alg.form_clusters(cluster_xy['x'], cluster_xy['y']) # 3) advanced cleaning and border cleaning if self.logger: #self.logger.info("OrderTrace: advanced cleaning...") self.logger.warning("OrderTrace: advanced cleaning...") new_x, new_y, new_index, all_status = self.alg.advanced_cluster_cleaning_handler(index, x, y) new_x, new_y, new_index = self.alg.clean_clusters_on_borders(new_x, new_y, new_index) # 5) Merge cluster if self.logger: #self.logger.info("OrderTrace: merging cluster...") self.logger.warning("OrderTrace: merging cluster...") c_x, c_y, c_index = self.alg.merge_clusters_and_clean(new_index, new_x, new_y) # 6) Find width if self.logger: #self.logger.info("OrderTrace: finding width...") self.logger.warning("OrderTrace: finding width...") all_widths, cluster_coeffs = self.alg.find_all_cluster_widths(c_index, c_x, c_y, power_for_width_estimation=3) # 7) post processing if self.do_post: if self.logger: self.logger.warning('OrderTrace: post processing...') post_coeffs, post_widths = self.alg.convert_for_post_process(cluster_coeffs, all_widths) _, all_widths = self.alg.post_process(post_coeffs, post_widths, orderlet_gap=self.orderlet_gap_pixels) # 8) convert result to dataframe if self.logger: self.logger.warning("OrderTrace: writing cluster into dataframe...") df = self.alg.write_cluster_info_to_dataframe(all_widths, cluster_coeffs) assert(isinstance(df, pd.DataFrame)) # self.input.create_extension('ORDER_TRACE_RESULT') # self.input.extensions['ORDER_TRACE_RESULT'] = df # self.input.create_extension(self.result_extension, pd.DataFrame) if self.result_path: if self.is_output_file: if not os.path.isdir(os.path.dirname(self.result_path)): os.makedirs(os.path.dirname(self.result_path), exist_ok=True) df.to_csv(self.result_path) else: self.input[self.result_path]= df for att in df.attrs: self.input.header[self.result_path][att] = df.attrs[att] self.input.receipt_add_entry('OrderTrace', self.__module__, f'config_path={self.config_path}', 'PASS') if self.logger: self.logger.info("OrderTrace: Receipt written") if self.logger: #self.logger.info("OrderTrace: Done!") self.logger.warning("OrderTrace: Done!") return Arguments(df) @staticmethod def find_range(range_des, limit): tmp_range = ast.literal_eval(range_des) if isinstance(range_des, str) else range_des if isinstance(tmp_range, list) and len(tmp_range) == 2: tmp_range = [int(t) if t >= 0 else int(limit + t) for t in tmp_range] return tmp_range tmp_range = [0, limit-1] return tmp_range