import numpy as np
import pandas as pd
from os.path import exists
from configparser import ConfigParser
from modules.Utils.config_parser import ConfigHandler
from modules.Utils.alg_base import ModuleAlgBase
[docs]
class CaHKAlg(ModuleAlgBase):
"""Ca H&K spectrum extraction.
This module defines class 'HKExtractionAlg' and methods to extract spectrum from H&K science data.
Args:
data (numpy.ndarray): Ca H&K 2D image data.
fibers (list): List containing the interested fibers to be extracted.
output_exts (list): List with the name of the extension to contain extraction data
output_wl_exts (list): List with the name of the extension to contain wavelength data
config (configparser.ConfigParser): config context.
logger (logging.Logger): Instance of logging.Logger from external application.
Attributes:
instrument (str): Imaging instrument.
hk_data (numpy.ndarray): Numpy array storing 2d image data.
fibers (list): List storing fibers to be processed.
data_range (list): Index range of all pixels.
trace_location (dict): Trace location per order per fiber.
order_buffer (numpy.ndarray): Buffer to contain flux computation.
Raises:
AttributeError: The ``Raises`` section is a list of all exceptions that are relevant to the interface.
TypeError: If there is type error for `data` or `config`.
Exception: If the size of `data` is less than 20 pixels by 20 pixels.
"""
FIT_ERROR_TH = 2.5 # default set per NEID flat
UPPER = 1
LOWER = 0
LOC_X1 = 'x0'
LOC_x2 = 'xf'
LOC_y1 = 'y0'
LOC_y2 = 'yf'
name = 'CaHK'
def __init__(self, data, fibers, output_exts=None, output_wl_exts=None, config=None, logger=None):
if not isinstance(data, np.ndarray) or (data.size == 0):
raise TypeError('image data type error, cannot construct object from CaHKAlg')
if not isinstance(config, ConfigParser):
raise TypeError('config type error, cannot construct object from CaHKAlg')
ModuleAlgBase.__init__(self, self.name, config, logger)
ins = self.config_param.get_config_value('instrument', '').upper() if self.config_param is not None else ''
self.config_ins = ConfigHandler(config, ins, self.config_param) # section of instrument or 'PARAM'
if not fibers and self.config_ins: # get fibers from configuration
fibers = self.config_ins.get_config_value('fibers', '')
if fibers and isinstance(fibers, str):
fibers = fibers.split(',')
if not fibers:
raise TypeError('fiber content error, cannot construct object from CaHKAlg')
self.instrument = ins
self.hk_data = data.astype('float64')
ny, nx = np.shape(data)
self.data_range = [0, ny - 1, 0, nx - 1]
self.fibers = fibers if isinstance(fibers, list) else [str(fibers)]
self.trace_location = {fiber: None for fiber in self.fibers}
self.order_buffer = np.zeros((1, nx), dtype=float)
self.output_exts = output_exts
self.output_wl_exts = output_wl_exts
self.ca_hk_gain = None
[docs]
def get_config_value(self, param: str, default):
"""Get defined value from the config file.
Search the value of the specified property from config section. The default value is returned if no found.
Args:
param (str): Name of the parameter to be searched.
default (str/int/float): Default value for the searched parameter.
Returns:
int/float/str: Value for the searched parameter.
"""
return self.config_ins.get_config_value(param, default)
[docs]
def get_data_range(self):
"""Get image size range
Returns:
numpy.ndarray: image range in order of y1, y2, x1 and x2.
"""
return self.data_range
[docs]
def get_instrument(self):
"""Get imaging instrument.
Returns:
str: Instrument name.
"""
return self.instrument
[docs]
def get_fibers(self):
"""Get imaging fibers
Returns:
list: list with fibers
"""
return self.fibers
def get_output_exts(self):
if not self.output_exts:
self.output_exts = self.get_config_value('hk_extract_exts', self.fibers)
if len(self.output_exts) < len(self.fibers):
for idx in range(len(self.output_exts), len(self.fibers)):
self.output_exts.append(self.fibers[idx])
return self.output_exts
def get_wavelength_exts(self):
if not self.output_wl_exts:
self.output_wl_exts = self.get_config_value('hk_wave_exts', [f+'_wave' for f in self.fibers])
if len(self.output_wl_exts) < len(self.fibers):
for idx in range(len(self.output_wl_exts), len(self.fibers)):
self.output_wl_exts.append(self.fibers[idx]+'_wave')
return self.output_wl_exts
def get_gain(self):
if self.ca_hk_gain is None:
self.ca_hk_gain = self.get_config_value('ca_hk_gain', 1.0)
return self.ca_hk_gain
[docs]
def get_trace_location(self, fiber=None):
"""Get the trace location on specified fibers
Args:
fiber (str): Fiber name. Defaults to None for all fibers
Returns:
dict: fiber location on one fiber or all fibers, like::
{
<fiber name 1>: {<order_1>: {'x1': , 'x2', 'y1': 'y2' },
<order_2>: {'x1': , 'x2', 'y1': 'y2' }, ...},
<fiber name n>: {<order_1>: {'x1': , 'x2', 'y1': 'y2' }, ...}
}
"""
if fiber is None:
return self.trace_location
else:
return {fiber: self.trace_location[fiber] if fiber in self.trace_location else None}
[docs]
def load_trace_location(self, trace_path):
"""Load the file containing trace definition and record the trace information per order and per fiber
Args:
trace_path: the path to a file with order trace information.
The file is assumed in csv format containing the header and
the space as the delimiter for each row.
Returns:
dict: each item in dict object has the trace value for each fiber, like::
{
<fiber name>: <fiber_trace>
# where <fiber trace> is a dict containing location for each order,
{<order_number_1>: {'x1': , 'y1':, 'x2': , 'y2': },
<order_number_2>: {'x1': , 'y1':, 'x2': , 'y2': }, .....,
<order_number_n>: {'x1': , 'y1':, 'x2': , 'y2': }}
}
"""
if not exists(trace_path):
return None
loc_result = pd.read_csv(trace_path, sep=' ')
loc_vals = np.array(loc_result.values)
loc_cols = np.array(loc_result.columns)
order_col_name = 'order'
fiber_col_name = 'fiber'
loc_col_names = [self.LOC_X1, self.LOC_y1, self.LOC_x2, self.LOC_y2]
loc_idx = {c: np.where(loc_cols == c)[0][0] for c in loc_col_names}
order_idx = np.where(loc_cols == order_col_name)[0][0]
fiber_idx = np.where(loc_cols == fiber_col_name)[0][0]
for fiber in self.fibers:
loc_for_fiber = loc_vals[np.where(loc_vals[:, fiber_idx] == fiber)[0], :] # rows with the same fiber
self.trace_location[fiber] = dict()
for loc in loc_for_fiber: # add each row from loc_for_fiber to trace_location for fiber
if loc[order_idx] >= 0:
self.trace_location[fiber][loc[order_idx]] = {'x1': loc[loc_idx[self.LOC_X1]],
'x2': loc[loc_idx[self.LOC_x2]],
'y1': loc[loc_idx[self.LOC_y1]],
'y2': loc[loc_idx[self.LOC_y2]]}
self.d_print("CaHKAlg: load trace location on fiber "+fiber + ": " + str(self.trace_location[fiber]))
return self.trace_location
[docs]
def get_spectral_data(self):
"""Get spectral information including data and dimension.
Returns:
tuple: Information of spectral data,
* (*numpy.ndarray*): 2D spectral data.
* **nx** (*int*): Width of the data.
* **ny** (*int*): Height of the data.
"""
ny, nx = np.shape(self.hk_data)
return self.hk_data, nx, ny
[docs]
def get_order_buffer(self):
"""Get a pre-allocated buffer with all zeros to contain sum of spectrum extraction
Returns:
numpy.ndarray: 1 x <spectrum width> array with all zeros
"""
self.order_buffer.fill(0.0)
return self.order_buffer
[docs]
@staticmethod
def write_data_to_dataframe(out_data, fiber_name, extraction_dim):
""" Write Ca H&K extraction result to an instance of Pandas DataFrame.
Args:
out_data (numpy.ndarray): H&K spectrum extraction result. Each row of the array corresponds to the reduced
1D data of one order.
fiber_name (str): Fiber name.
extraction_dim (dict): Dimension in orders for the specified fiber.
Returns:
Pandas.DataFrame: Instance of DataFrame containing the extraction result plus the following attributes:
- *FIBER*: fiber name.
- *Ordern*: Dimension data for order n.
"""
df_result = pd.DataFrame(out_data)
df_result.attrs['FIBER'] = fiber_name
if isinstance(extraction_dim, dict) and bool(extraction_dim):
for order_no in sorted(extraction_dim.keys()):
order_coord = extraction_dim[order_no]
order_dim = ','.join([str(coord) for coord in [order_coord['x1'], order_coord['y1'],
order_coord['x2'], order_coord['y2']]])
df_result.attrs['ORDER'+str(order_no)] = (order_dim, "x1,y1,x2,y2")
return df_result
[docs]
def img_subtraction(self, dark_img, bias_img):
""" Hk image processing by subtracting the dark image and the bias image if existing.
Args:
dark_img (numpy.array): dark image
bias_img (numpy.array): biase image
Returns:
bool: False in case the image size doesn't match, or True.
"""
if dark_img is not None:
if np.shape(self.hk_data) == np.shape(dark_img):
self.hk_data -= dark_img.astype('float64')
else:
return False, "image dimension between the raw image and dark image doesn't match"
if bias_img is not None:
if np.shape(self.hk_data) == np.shape(bias_img):
self.hk_data -= bias_img.astype('float64')
else:
return False, "image dimension between the raw image and bias image doesn't match"
return True, ""
[docs]
def img_scaling(self):
""" Scale the hk data based on the defined gain that converts the image from the count to electron charge """
self.get_gain()
self.hk_data = self.hk_data * self.ca_hk_gain
[docs]
def load_wavelength_table(self, wave_table_file: str, fiber: str):
""" load the csv file with wavelength solution table. The table size is also checked.
Args:
wave_table_file (str): path to the csv table file with hk wavelength solution data
fiber (str): the associated fiber name
Returns:
numpy.array: Array containing the wavelength solution with the size <total_order>*<image width>
"""
if not exists(wave_table_file):
return None
wave_result = pd.read_csv(wave_table_file, header=None, sep=' ', comment='#', engine='python')
wave_vals = np.array(wave_result.values)
if fiber in self.trace_location and self.trace_location[fiber] is not None:
total_order = len(self.trace_location[fiber].keys())
_, pixel_width, _ = self.get_spectral_data()
r, c = np.shape(wave_vals)
if total_order != c or pixel_width != r:
return None
new_wave_vals = np.transpose(wave_vals)
return new_wave_vals