# An example pipeline that is used to test the template fitting
# algorithm module.
from asyncio.log import logger
import os
import signal
import sys
import gc
import time
import glob
from dotenv.main import load_dotenv
from kpfpipe.logger import start_logger
# AST recipe support
import ast
from kpfpipe.pipelines.kpf_parse_ast import KpfPipelineNodeVisitor
import kpfpipe.config.pipeline_config as cfg
# KeckDRPFramework dependencies
from keckdrpframework.pipelines.base_pipeline import BasePipeline
from keckdrpframework.models.arguments import Arguments
from keckdrpframework.models.action import Action
from keckdrpframework.models.processing_context import ProcessingContext
[docs]
class KPFPipeline(BasePipeline):
"""
Pipeline to Process KPF data using the KeckDRPFramework
Args:
context (ProcessingContext): context class provided by the framework
Attributes:
event_table (dictionary): Values are tuples, e.g. "action_name: (name_of_callable, current_state, next_event_name)"
"""
# Modification:
name = 'KPF-Pipe'
"""
event_table (dictionary): table of actions known to framework. All primitives must be registered here.
Data reduction primitives are registered into the event_table as part of the processing
of the "from ... import" statement at the top of a recipe.
The format of entries is:
action_name: (name_of_callable, current_state, next_event_name)
"""
event_table = {
'start_recipe': ('start_recipe', 'starting recipe', 'wait'),
'resume_recipe': ('resume_recipe', 'resuming recipe', 'wait'),
'next_file': ('next_file', 'updating file name', 'wait'),
'to_fits': ('to_fits', 'processing', 'resume_recipe'),
'kpf0_from_fits': ('kpf0_from_fits', 'processing', 'resume_recipe'),
'kpf1_from_fits': ('kpf1_from_fits', 'processing', 'resume_recipe'),
'kpf2_from_fits': ('kpf2_from_fits', 'processing', 'resume_recipe'),
'wait': ('wait_for_event', 'waiting...', None),
'exit': ('exit_loop', 'exiting...', None)
}
def __init__(self, context: ProcessingContext):
BasePipeline.__init__(self, context)
load_dotenv()
[docs]
def register_recipe_builtins(self):
"""
register_recipe_builtins() registers some built-in functions for the recipe to use
without having to invoke them through the Framework's queue. If additional built-in
functions are needed, this is the place to add them.
The supported built-ins are:
int:
Same behavior as in Python
float:
Same behavior as in Python
str:
Same behavior as in Python
len:
Same behavior as in Python
find_files:
Same behavior as glob.glob in Python, which returns a list of files
that match the string pattern given as its argument. In particular,
* expansion is supported.
split:
Same behavior as os.path.split in Python. It returns two strings,
the first representing the directories of a file path, and the second
representing the simple file name within the directory.
split_ext:
Same behavior as os.path.splitext in Python. It returns two strings,
the second being the file extension of a file path, including the dot,
and the first being everything else.
dirname:
Same behavior as os.path.dirname. It returns the directory portion of
a file path, excluding the file name itself, with no trailing separator.
exists:
Same behavior as os.path.exists() It returns True if the file or directory
exists at the specified path.
print:
Similar behavior to print(), but it only accepts one argument and
the output is through logger.info(). f-strings are supported.
"""
def _recipe_print(value):
"""
Replacement for Python's built-in print inside a recipe.
Only one or zero positional arguments are supported (see the
recipe parser limitation on var-args built-ins).
"""
self.logger.info(str(value))
return None
self._recipe_visitor.register_builtin('int', int, 1)
self._recipe_visitor.register_builtin('float', float, 1)
self._recipe_visitor.register_builtin('str', str, 1)
self._recipe_visitor.register_builtin('len', len, 1)
self._recipe_visitor.register_builtin('find_files', glob.glob, 1)
self._recipe_visitor.register_builtin('split', os.path.split, 1)
self._recipe_visitor.register_builtin('splitext', os.path.splitext, 1)
self._recipe_visitor.register_builtin('dirname', os.path.dirname, 1)
self._recipe_visitor.register_builtin('exists', os.path.exists, 1)
self._recipe_visitor.register_builtin('print', _recipe_print, 1)
[docs]
def preload_env(self):
"""
preload_env() preloads environment variables using dotenv """
"""
env_values = dotenv_values()
for key in env_values:
self.context.logger.debug(f"_preload_env: {key} <- {env_values.get(key)}")
self._recipe_visitor.load_env_value(key, env_values.get(key))
"""
for key in os.environ:
self.context.logger.debug(f"_preload_env: {key} <- {os.environ.get(key)}")
self._recipe_visitor.load_env_value(key, os.environ.get(key))
[docs]
def start(self, configfile: str) -> None:
'''
Initialize the customized pipeline.
Customized in that it sets up logger and configurations differently
from how the BasePipeline does.
Args:
config (ConfigParser): containing pipeline configuration
'''
## setup pipeline configuration
# Technically the pipeline's configuration is stored in self.context as
# a ConfigClass() defined by keckDRP. But we will be using configParser
self.configfile = configfile
self.logger = start_logger(self.name, configfile)
self.logger.info('Pipeline logger started')
## Setup argument
try:
self.config = cfg.ConfigClass()
self.config.read(configfile)
arg = self.config._sections['ARGUMENT']
except KeyError:
raise IOError('cannot find [ARGUMENT] section in config')
## Setup primitive-specific configs:
try:
self.context.config_path = self.config._sections['MODULE_CONFIGS']
except KeyError:
raise IOError('cannot find [MODULE_CONFIGS] section in config')
# Add useful attributes onto the self.context object
self.context.arg = arg
self.context.pipe_config = self.config
self.logger.info('Finished initializing Pipeline')
[docs]
def start_recipe(self, action, context):
"""
Starts evaluating the recipe file (Python syntax) specified in context.config.run.recipe.
All actions are executed consecutively in the Framework's high priority queue.
Before starting processing the recipe, built-in functions available to recipes without
having to enqueue them to the Framework are registered, and values defined in the environment
are imported so that they are also available to recipes.
Args:
action (keckdrpframework.models.action.Action): Keck DRPF Action object
context (keckdrpframework.models.ProcessingContext.ProcessingContext): Keck DRPF ProcessingContext object
"""
recipe_file = getattr(action.args, 'recipe', None)
if recipe_file is not None:
with open(recipe_file) as f:
fstr = f.read()
else:
fstr = ''
self._recipe_ast = ast.parse(fstr)
context.args = action.args
if 'file_path' in context.args.iter_kw() and '.fits' in context.args['file_path']:
log_path = context.args['file_path'].replace('.fits', '.log')
elif 'date_dir' in context.args.iter_kw():
log_path = 'pipeline_' + context.args['date_dir'] + '.log'
else:
log_path = os.path.basename(recipe_file).split('.')[0] + '.log'
logname = os.path.basename(log_path)
if 'log_directory' in self.config['LOGGER']:
log_path = os.path.join(self.config.get('LOGGER', 'log_directory'),
self.context.args['date_dir'], logname)
self.logger.info("Starting new log with path: {}".format(log_path))
dirpath = os.path.dirname(log_path)
if not os.path.exists(dirpath) and len(dirpath) > 0:
os.makedirs(dirpath, exist_ok=True)
self.logger = start_logger(logname, self.configfile, log_path=log_path)
self.context.logger = self.logger
self.logger.info("*************** Executing recipe {} ***************".format(recipe_file))
self._recipe_visitor = KpfPipelineNodeVisitor(pipeline=self, context=context)
self.register_recipe_builtins()
## set up environment
try:
self.preload_env()
except Exception as e:
self.logger.error(f"KPF-Pipeline couldn't load environment due to exception {e}")
self._recipe_visitor.visit(self._recipe_ast)
return Arguments(name="start_recipe_return")
[docs]
def exit_loop(self, action, context):
"""
Force the Keck DRP Framework to exit the infinite loop
Args:
action (keckdrpframework.models.action.Action): Keck DRPF Action object
context (keckdrpframework.models.ProcessingContext.ProcessingContext): Keck DRPF ProcessingContext object
"""
self.logger.info("exiting pipeline...")
os._exit(1)
# reentry after call
[docs]
def resume_recipe(self, action: Action, context: ProcessingContext):
"""
Continues evaluating the recipe started in start_recipe(). resume_recipe() will run immediately
after each data processing primitive, and makes return values from the previous primitive, stored in an
Arguments class instance in action.args, available back to the recipe.
Args:
action (keckdrpframework.models.action.Action): Keck DRPF Action object
context (keckdrpframework.models.ProcessingContext.ProcessingContext): Keck DRPF ProcessingContext object
"""
# pick up the recipe processing where we left off
self.logger.debug("resume_recipe")
self._recipe_visitor.returning_from_call = True
self._recipe_visitor.awaiting_call_return = False
self._recipe_visitor.call_output = action.args # framework put previous output here
self._recipe_visitor.visit(self._recipe_ast)
return Arguments(name="resume_recipe_return") # nothing to actually return, but meet the Framework requirement
def next_file(self, action: Action, context: ProcessingContext):
try:
file_path = action.args['file_path']
except:
logger.info("Defaulting to action.args['name'] for file_path.")
file_path = action.args['name']
action.args['date_dir'] = os.path.basename(os.path.dirname(
file_path))
self.start_recipe(action, context)
return Arguments(name="next_file")
[docs]
def wait_for_event(self, action, context):
"""
Custom no event event
Args:
action (keckdrpframework.models.action.Action): Keck DRPF Action object
context (keckdrpframework.models.ProcessingContext.ProcessingContext): Keck DRPF ProcessingContext object
"""
gc.collect()
time.sleep(1)
return Arguments(name="wait_complete")
def _pre(self):
pass
def _post(self):
pass