Source code for kpfpipe.pipelines.kpf_parse_ast

# kpf_parse_ast.py

from ast import NodeVisitor, parse
import _ast
from collections.abc import Iterable
from queue import Queue
import os
from copy import copy

from keckdrpframework.models.action import Action
from keckdrpframework.models.arguments import Arguments
from keckdrpframework.models.processing_context import ProcessingContext
import configparser as cp

[docs] class RecipeError(Exception): """ RecipeError is raised whenever an error in the recipe or recipe processing is encountered. The most common reason for raising RecipeError is an error in the recipe itself, such as a typo or undefined variable or function name, or the use of syntactic elements from Python that are not supported in recipes. """
[docs] class KpfPipelineNodeVisitor(NodeVisitor): """ KpfPipelineNodeVisitor is a node visitor class derived from ast.NodeVisitor to convert KPF pipeline recipes expressed in python syntax into operations on the KPF Framework. This class implements methods with names of the form visit_<syntax_element>, each of which handles the actual processing of <syntax_element> instances found in a recipe. ast.NodeVisitor implements visit(), which is the code that calls the appropriate visit_<syntax_element> method, or generic_visit() if no such method is found. For recipes containing only supported syntax elements, generic_visit() is never called. There are three class member flags that keep the state of node walking across calls to data processing primitives and loop iterations. They are: self.awaiting_call_return: This flag is set just before the visit_Call() method returns, after queueing a data processing primitive to run in the Framework. Any node visiting method that can have a Call within it must "return" immediately when this flag is set. It is cleared by resume_recipe() after the Framework finishes executing the called primitive. self.returning_from_call: This flag is set by resume_recipe() just before it starts walking the recipe syntax tree, which starts from the top each time. (Each visit_<syntax_element> method is responsible for behaving correctly in the face of these multiple "duplicate" calls, either by redoing work quickly, or by saving state and results of work already done. See the notes about the kpf_completed flag below.) visit_Call() uses this flag to trigger the processing of the return value(s) from the data processing primitive previously enqueued to the framework that resume_recipe() is following This flag distinguishes returning from a finished call from preparing for a new call. self._reset_visited_states: This flag is set by reset_visited_states(), which is called from visit_For() and potentially other loop control methods to be implemented in the future. It causes the visit_<syntax_element> methods to reset their saved state to prepare for another iteration of a loop. In addition, all visit_<syntax_element>() methods doing work that takes a significant amount of time use an internal attribute, kpf_completed, to indicate that the work represented by that node and any of its children has been completed, and as necessary the result has been stored in an attribute of the AST node itself. Some visit_<syntax_element>() methods store additional state and parameters. Those behaviors are covered below in the method documentation. """ def __init__(self, pipeline=None, context=None): """ __init__() constructs an instance of the class that actually walks ("visits") the recipe nodes, after they have been parsed into an Abstract Syntax Tree (AST). The actual work of the pipeline recipe is done by the various visit_<syntax_element> class methods. """ NodeVisitor.__init__(self) # instantiate the parameters dict self._params = None # instantiate the environment dict self._env = {} # store and load stacks # (implemented as lists; use append() and pop()) self._store = list() self._load = list() # KPF framework items self.pipeline = pipeline self.context = context # local state flags self.awaiting_call_return = False self.returning_from_call = False self._reset_visited_states = False # value returned by primitive executed by framework self.call_output = None self._builtins = {} self.subrecipe_depth = 0
[docs] def register_builtin(self, key, func, nargs): """ register_builtin() registers a function so that the function so registered can be called from within a recipe directly, without using the Framework's event queue mechanism. This is useful for recipe support functions like converting floats to ints and splitting strings. Args: key: the name of the built-in function as a string func: the python function itself nargs: the number of input args the function expects Note: Functions that can take a variable number of arguments are not supported by the recipe parser. """ self._builtins[key] = (func, nargs)
def load_env_value(self, key, value): self._env[key] = value
[docs] def visit_Module(self, node): """ visit_Module() processes "module" node of a parsed recipe. A Module node is always at the top of an AST tree returned by ast.parse(), and there is only one per recipe file. The parameters dictionary self._params used to store the values of recipe variables is initialized or reset things when processing the module node, and cleaned up (releasing allocated memory) at the end. Cleaning up has the effect of freeing storage used by variables from a previous pipeline recipe. The recipe parser supports invocation of sub-recipes through the special-case function name "invoke_subrecipe()" (see visit_Call()). Care is taken here to avoid reinitializing the parsing state when processing the "module" node of a sub-recipe. """ if self._reset_visited_states: setattr(node, 'kpf_started', False) for item in node.body: self.visit(item) if self.subrecipe_depth == 0: self._params = None # let storage get collected return self.pipeline.logger.info(f"Module: subrecipe_depth = {self.subrecipe_depth}") if not getattr(node, 'kpf_started', False): if self.subrecipe_depth == 0: self._params = {} setattr(node, 'kpf_started', True) for item in node.body: self.visit(item) if self.awaiting_call_return: return if self.subrecipe_depth == 0: self._params = None # let allocated memory get collected
[docs] def visit_ImportFrom(self, node): """ visit_ImportFrom() processes the "from ... import" statement in a recipe. It causes the imported function names to be added to the pipeline's event_table, and the paths in the "from" section to be added to the Framework's module search path. In combination, these two actions allow data reduction pipeline primitives to be successfully run when they are invoked within a recipe. The actual queueing of a pipeline primitive onto the Framework's event queue occurs in visit_Call(). Note: An "import" clause without a corresponding "from" clause is not supported in recipes, since it would not provide a mechanism to represent a module search path. A bare "import" clause would want to call "visit_Import()", which this class does not implement, so generic_visit() would be called instead. generic_visit() logs and raises an error. Note: Because "from ... import" statements in subrecipes could be imported more than once if the subrecipe is within a loop, inclusing of "from ... import" statements should only appear in the primary recipe, not subrecipes. """ if self._reset_visited_states: setattr(node, 'kpf_completed', False) for name in node.names: self.visit(name) return if not getattr(node, 'kpf_completed', False): module = node.module # append the module path to the framework's primitive_path self.context.config.primitive_path = tuple([*self.context.config.primitive_path, module]) loadQSizeBefore = len(self._load) for name in node.names: self.visit(name) if len(self._load) > loadQSizeBefore: # import the named primitive # This comes as a 2-element tuple from visit_alias # # just add the name to the event_table for now # But we should ensure that the name exists in the module and is Callable tup = self._load.pop() # create an event_table entry that returns control # to the pipeline after running self.pipeline.event_table[tup[0]] = (tup[0], "Processing", "resume_recipe") self.pipeline.logger.info(f"Added {tup[0]} from {module} to event_table") setattr(node, 'kpf_completed', True)
[docs] def visit_alias(self, node): """ visit_alias() processes an "as" alias node, which can only appear as part of an "from ... import ... as ..." construct. The "as" clause is parsed, but ignored, and so should not be used in recipes. visit_alias() puts the name and asname on the _load stack as a tuple. visit_importFrom() handles the heavy lifting. Note: asname is currently not supported and is ignored. """ if self._reset_visited_states: setattr(node, 'kpf_completed', False) return if not getattr(node, 'kpf_completed', False): self._load.append((node.name, node.asname)) self.pipeline.logger.debug(f"alias: {node.name} as {node.asname}") setattr(node, 'kpf_completed', True)
[docs] def visit_Name(self, node): """ visit_Name() processes variable names encountered in a recipe. A Name can occur on either the left or right side of an assignment. If it's on the left side the context is Store, and the Name is the variable name into which to store a value. visit_Name() pushes that variable name on the _store stack. If the Name is on the right side, e.g. as part of an expression, visit_Name() looks up the name in the params dict, and pushes the corresponding value on the _load stack. There are three special cases to be aware of. If the name "None" is encountered, the value None is pushed on the _load stack. If the name "config" is encountered, the config object is pushed on the _load stack. This object supports access to attributes, so a subsequent call to visit_attribute(), which is provided from the "." in an expression like "config.ARGUMENT", will work correctly to extract the ARGUMENT dictionary from the config. The name is looked up in an environment dictionary that has been preloaded from the shell environment that invoked the framework and pipeline. If none of the previous cases produced a successful match, the name is looked up in the internal _params dictionary, which stores by name the values of variables that have been previously encountered in the recipe on the left side of assignment statements. If the name is not found in any of the above places, an error is logged and raised. Note that "config" and keywords from the environment will hide variables defined in recipes of the same name. Implementer note: The same instance of a Name can appear as different nodes in an AST, so nothing should be stored in the node as a node-specific attribute. """ if self._reset_visited_states: return self.pipeline.logger.debug(f"Name: {node.id}") if isinstance(node.ctx, _ast.Store): self.pipeline.logger.debug(f"Name is storing {node.id}") self._store.append(node.id) elif isinstance(node.ctx, _ast.Load): if node.id == "None": value = None elif node.id == "config": if self.pipeline != None and hasattr(self.pipeline, "config"): value = self.pipeline.config else: self.pipeline.logger.error(f"Name: No context or context has no config attribute") raise Exception(f"Name: No context or context has no config attribute") elif node.id == "context": if self.context != None and hasattr(self.context, "args"): value = self.context.args else: self.pipeline.logger.error(f"Name: No context or context has no args attribute") raise Exception(f"Name: No context or context has no args attribute") elif self._env.get(node.id): value = self._env.get(node.id) else: try: value = self._params[node.id] except KeyError: # self.pipeline.logger.error( # f"Name {node.id} on line {node.lineno} of recipe not defined.") raise RecipeError( f"Name {node.id} on line {node.lineno} of recipe not defined. Recipe environment: {self._env}. Python environment: {os.environ}") self.pipeline.logger.debug(f"Name is loading {value} from {node.id}") self._load.append(value) else: raise RecipeError( f"visit_Name: on recipe line {node.lineno}, ctx is unexpected type: {type(node.ctx)}")
[docs] def visit_For(self, node): """ visit_For() processes the "for" node of an AST. Handling looping correctly in a recipe is made more complex because of the need to support calls to data processing primitives within the loop, which cause start_recipe() or resume_recipe() to stop walking the AST tree nodes and return so that the Framework can run the primitive. When resume_recipe() picks up walking the parse tree after the primitive returns, it must be able to resume with the same state as when it was interrupted. Furthermore, state flags and results of operations stored on the various nodes within the loop must be appropriately reset for each iteration, and nesting of loops must work correctly. In addition to using node attributes as described in the documentation for the class as a whole, visit_for() also uses the following attributes stored on the AST node to keep track of the state of loop processing and loop parameters. kpf_started: Processing has started for this "for" node and it's children, but has not completed unless kpf_completed is also set. kpf_params: A local dictionary used to store the values of loop variables and assignment targets. The principal stored parameters are "target", "args_iter" and "current_arg" """ if self._reset_visited_states: setattr(node, 'kpf_completed', False) setattr(node, 'kpf_started', False) if hasattr(node, 'kpf_params'): delattr(node, 'kpf_params') self.visit(node.target) self.visit(node.iter) for subnode in node.body: self.visit(subnode) return if not getattr(node, 'kpf_completed', False): if not getattr(node, 'kpf_started', False): params = {} storeQSizeBefore = len(self._store) self.visit(node.target) if self.awaiting_call_return: return target = self._store.pop() if len(self._store) > storeQSizeBefore else None params['target'] = target loadQSizeBefore = len(self._load) self.visit(node.iter) args = [] if len(self._load) - loadQSizeBefore == 1: item = self._load.pop() if isinstance(item, Iterable): self.pipeline.logger.debug(f"For: Popping first list item, {item}, of type {type(item)}") args = item else: args.insert(0, item) while len(self._load) > loadQSizeBefore: # pick up any additional items item = self._load.pop() self.pipeline.logger.debug(f"For: next list item is {item} of type {type(item)}") args.insert(0, item) args_iter = iter(list(args)) try: current_arg = next(args_iter) self.pipeline.logger.debug(f"For: first call to next returned {current_arg} of type {type(current_arg)}") except StopIteration: current_arg = None params['args_iter'] = args_iter params['current_arg'] = current_arg setattr(node, 'kpf_params', params) setattr(node, 'kpf_started', True) else: params = getattr(node, 'kpf_params', None) assert(params is not None) target = params.get('target') args_iter = params.get('args_iter') current_arg = params.get('current_arg') while current_arg is not None: self.pipeline.logger.debug(f"For: in while loop with current_arg {current_arg}, type {type(current_arg)}") self._params[target] = current_arg for subnode in node.body: self.visit(subnode) if self.awaiting_call_return: return # reset the node visited states for all nodes # underneath this "for" loop to set up for the # next iteration of the loop. self.pipeline.logger.debug("For: resetting visited states before looping") for subnode in node.body: self.reset_visited_states(subnode) # iterate by updating current_arg (and the arg iterator) try: current_arg = next(args_iter) params['current_arg'] = current_arg except StopIteration: break self.pipeline.logger.info(f"Starting For loop on recipe line {node.lineno} with arg {current_arg}") setattr(node, 'kpf_completed', True)
[docs] def visit_Assign(self, node): """ visit_Assign() processes the assignment of one or more constant or calculated values to named variables. The variable names to assign into come from the _store stack, while the values come from the _load stack. Calls to visit_call() may occur in the tree that is walked to generate the value to be assigned, and therefore may set self._awaiting_call_return, in which case visit_Assign() we need to immediately return, and pick up where we left off later, completing the assignment. See also visit_call() and resume_recipe(). """ if self._reset_visited_states: setattr(node, 'kpf_completed', False) setattr(node, 'kpf_completed_targets', False) setattr(node, 'kpf_completed_values', False) if hasattr(node, 'kpf_storeQSizeBefore'): delattr(node, 'kpf_storeQSizeBefore') if hasattr(node, 'kpf_num_targets'): delattr(node, 'kpf_num_targets') for target in node.targets: self.visit(target) self.visit(node.value) return if not getattr(node, 'kpf_completed', False): loadQSizeBefore = len(self._load) storeQSizeBefore = len(self._store) if not getattr(node, 'kpf_completed_targets', False): setattr(node, 'kpf_storeQSizeBefore', storeQSizeBefore) for target in node.targets: self.visit(target) if self.awaiting_call_return: return num_store_targets = len(self._store[storeQSizeBefore:]) setattr(node, "kpf_num_targets", num_store_targets) setattr(node, 'kpf_completed_targets', True) else: num_store_targets = getattr(node, 'kpf_num_targets', 0) if not getattr(node, 'kpf_completed_values', False): self.visit(node.value) if self.awaiting_call_return: return setattr(node, 'kpf_completed_values', True) while num_store_targets > 0 and len(self._load) > loadQSizeBefore: target = self._store.pop() self.pipeline.logger.debug(f"Assign: assignment target is {target}") if target == '_': self._load.pop() # discard else: self._params[target] = self._load.pop() self.pipeline.logger.info(f"Assign: {target} <- {self._params[target]}, type: {self._params[target].__class__.__name__}") num_store_targets -= 1 had_error = False while len(self._store) > storeQSizeBefore: had_error = True self.pipeline.logger.error( f"Assign: unfilled target: {self._store.pop()} on line {node.lineno} of recipe.") while len(self._load) > loadQSizeBefore: had_error = True self.pipeline.logger.error( f"Assign: unused value: {self._load.pop()} on line {node.lineno} of recipe.") if had_error: raise RecipeError( f"Error during assignment on line {node.lineno} of recipe. See log for details.") setattr(node, 'kpf_completed', True)
# UnaryOp and the unary operators
[docs] def visit_UnaryOp(self, node): """ visit_UnaryOp() implements the UnaryOps "-x", "+x" and "not x". The actual work is done in the operator visitor method, e.g. visit_UAdd or visit_USub. Implementor Note: This implementation doesn't support calls within unaryOp expressions, so we don't bother guarding for self.awaiting_call_return here, nor in the Unary Operator methods. """ if self._reset_visited_states: self.visit(node.operand) self.visit(node.op) return self.pipeline.logger.debug(f"UnaryOp:") self.visit(node.operand) self.visit(node.op)
# Unary Operators def _unary_op_impl(self, node, name, func): """ Helper function containing common implementation of unary operators. """ if self._reset_visited_states: return self.pipeline.logger.debug(name) if len(self._load) == 0: raise RecipeError( f"Unary operator {name} invoked on recipe line {name.lineno} with no argument") self._load.append(func(self._load.pop()))
[docs] def visit_UAdd(self, node): """ visit_UAdd() implements the operator for unary +, as in "+x" by invoking the internal method _unary_op_impl() with an appropriate lambda function. See also visit_UnaryOp(). """ self._unary_op_impl(node, "UAdd", lambda x : x)
[docs] def visit_USub(self, node): """ visit_USub() implements the operator for unary -, as in "-x" by invoking the internal method _unary_op_impl() with an appropriate lambda function. See also visit_UnaryOp(). """ self._unary_op_impl(node, "USub", lambda x : -x)
[docs] def visit_Not(self, node): """ visit_Not() implements the operator for not, as in " not x" by invoking the internal method _unary_op_impl() with an appropriate lambda function. See also visit_UnaryOp(). """ self._unary_op_impl(node, "Not", lambda x : not x)
# BinOp, BoolOp and the binary operators
[docs] def visit_BinOp(self, node): """ visit_BinOp() implements binary operations, i.e. "x + y", "x - y", "x * y", "x / y". The actual work is done in the operator visitor method, e.g. visit_Add or visit_Mult. Implementor Note: This implementation doesn't support calls within binOp expressions, so we don't bother guarding for self.awaiting_call_return here, nor in the binary operator methods. """ if self._reset_visited_states: self.visit(node.right) self.visit(node.left) self.visit(node.op) return self.pipeline.logger.debug("BinOp:") # right before left because they're being pushed on a stack, so left comes off first self.visit(node.right) self.visit(node.left) self.visit(node.op)
[docs] def visit_BoolOp(self, node): """ visit_BoolOp() implements boolean binary operations, i.e. "x and y" and "x or y". The actual work is done in the operator visitor method, e.g. visit_And. Implementor Note: This implementation doesn't support calls within boolOp expressions, so we don't bother guarding for self.awaiting_call_return here, nor in the binary operator methods. """ if self._reset_visited_states: for item in reversed(node.values): self.visit(item) self.visit(node.op) return self.pipeline.logger.debug("BoolOp:") # list is reversed because items are being pushed on a stack, so they come off last first for item in reversed(node.values): self.visit(item) self.visit(node.op)
# binary operators def _binary_op_impl(self, node, name, func): """ Helper function containing common implementation of binary operators. """ if self._reset_visited_states: return self.pipeline.logger.debug(name) if len(self._load) < 2: raise RecipeError( f"Binary operator {name} invoked on recipe line {node.lineno} " + f"with insufficient number of arguments {len(self._load)}") self._load.append(func(self._load.pop(), self._load.pop()))
[docs] def visit_Add(self, node): """ visit_Add() implements the binary addition operator by invoking the internal method _binary_op_impl() with an appropriate lambda function. See also visit_BinOp() """ self._binary_op_impl(node, "Add", lambda x, y: x + y)
[docs] def visit_Sub(self, node): """ visit_Sub() implements the binary subtraction operator by invoking the internal method _binary_op_impl() with an appropriate lambda function. See also visit_BinOp() """ self._binary_op_impl(node, "Sub", lambda x, y: x - y)
[docs] def visit_Mult(self, node): """ visit_Mult() implements the binary multiplication operator by invoking the internal method _binary_op_impl() with an appropriate lambda function. See also visit_BinOp() """ self._binary_op_impl(node, "Mult", lambda x, y: x * y)
[docs] def visit_Div(self, node): """ visit_Div() implements the binary division operator by invoking the internal method _binary_op_impl() with an appropriate lambda function. See also visit_BinOp() """ self._binary_op_impl(node, "Div", lambda x, y: x / y)
[docs] def visit_And(self, node): """ visit_And() implements the boolean "and" function by invoking the internal method _binary_op_impl() with an appropriate lambda function. See also visit_BinOp() """ self._binary_op_impl(node, "And", lambda x, y: x and y)
[docs] def visit_Or(self, node): """ visit_Or() implements the boolean "or" function by invoking the internal method _binary_op_impl() with an appropriate lambda function. See also visit_BinOp() """ self._binary_op_impl(node, "Or", lambda x, y: x or y)
# Compare and comparison operators
[docs] def visit_Compare(self, node): """ visit_Compare() implements comparison expressions, e.g. "x <= y". It walks the AST subtrees for the left and right side of the comparison, pushing the corresponding values on the _load stack. It "visits" the comparison operator itself, which results in popping off the values for the two sides and pushing either "True" or "False" on the _load stack as a Bool. See also the comparison operators, e.g. visit_Eq(), visit_NotEq(), visit_Lt(), visit_LtE(), visit_Is(), visit_IsNot(), visit_In(). """ if self._reset_visited_states: setattr(node, 'kpf_completed', False) for item in node.comparators: self.visit(item) self.visit(node.left) for op in node.ops: self.visit(op) return if not getattr(node, 'kpf_completed', False): self.pipeline.logger.debug(f"Compare") loadQSizeBefore = len(self._load) # comparators before left because they're going on a stack, so left can be pulled first for item in node.comparators: self.visit(item) self.visit(node.left) for op in node.ops: self.visit(op) setattr(node, 'kpf_completed', True)
# Comparison operators def _compare_op_impl(self, node, name, func): """ Helper function containing common implementation of comparison operators. """ if self._reset_visited_states: return self.pipeline.logger.debug(name) if len(self._load) < 2: raise RecipeError( f"Comparison operator {name} invoked on line {node.lineno} " + f"with less than two arguments: {len(self._load)}") self._load.append(func(self._load.pop(), self._load.pop()))
[docs] def visit_Eq(self, node): """ visit_Eq() implements the equality comparison operator by invoking the internal method _compare_op_impl() with an appropriate lambda function. See also visit_Compare(). """ self._compare_op_impl(node, "Eq", lambda x, y: x == y)
[docs] def visit_NotEq(self, node): """ visit_NotEq() implements the inequality comparison operator by invoking the internal method _compare_op_impl() with an appropriate lambda function. See also visit_Compare(). """ self._compare_op_impl(node, "NotEq", lambda x, y: x != y)
[docs] def visit_Lt(self, node): """ visit_Lt() implements the less than comparison operator by invoking the internal method _compare_op_impl() with an appropriate lambda function. See also visit_Compare(). """ self._compare_op_impl(node, "Lt", lambda x, y: x < y)
[docs] def visit_LtE(self, node): """ visit_LtE() implements the less than or equal comparison operator by invoking the internal method _compare_op_impl() with an appropriate lambda function. See also visit_Compare(). """ self._compare_op_impl(node, "LtE", lambda x, y: x <= y)
[docs] def visit_Gt(self, node): """ visit_Gt() implements the greater than comparison operator by invoking the internal method _compare_op_impl() with an appropriate lambda function. """ self._compare_op_impl(node, "Gt", lambda x, y: x > y)
[docs] def visit_GtE(self, node): """ visit_GtE() implements the greater than or equal comparison operator by invoking the internal method _compare_op_impl() with an appropriate lambda function. See also visit_Compare(). """ self._compare_op_impl(node, "GtE", lambda x, y: x >= y)
[docs] def visit_Is(self, node): """ visit_Is() implements the "is" comparison operator by invoking the internal method _compare_op_impl() with an appropriate lambda function. See also visit_Compare(). """ self._compare_op_impl(node, "Is", lambda x, y: x is y)
[docs] def visit_IsNot(self, node): """ visit_Eq() implements the "is not" comparison operator by invoking the internal method _compare_op_impl() with an appropriate lambda function. See also visit_Compare(). """ self._compare_op_impl(node, "IsNot", lambda x, y: not (x is y))
[docs] def visit_In(self, node): """ visit_In() implements the "in" range comparison operator by invoking See also visit_Compare(). the internal method _compare_op_impl() with an appropriate lambda function. """ self._compare_op_impl(node, "In", lambda x, y: x in y)
# TODO: implement visit_In and visit_NotIn. Depends on support for Tuple and maybe others
[docs] def visit_Call(self, node): """ visit_Call() implements function call syntax. The function can be one of several built-in functions, or can enqueue a data processing primitive to be run in the Framework. In the latter case, it cooperates with return_recipe() to make available to the recipe values returned from data processing primitives. Whatever the function type, arguments are popped from the _load stack, and results of the call are pushed back onto the _load stack. If the function's name is the special case "invoke_subrecipe", the one expected argument is a path to a recipe file. If it has not already been, it is parsed into an abstract syntax tree (AST) and hung on the "call" tree node for future use. The subtree is then "visited" just like any other subtree. The head of the subrecipe tree is a "module" node, as for all recipes. See visit_module() for more details on how subrecipes are handled. If the name of the function to be called is not "invoke_subrecipe", the registry of built-in functions is checked. (See the documentation for "register_recipe_builtins()" for a list of built-ins and their behavior.) If the function name is not found among the registered built-ins, it is presumed to be a data processing primitive to be run on the Keck framework. The list of arguments popped from the _load stack is wrapped in an Arguments class object, an event is constructed containing the primitive name and the argument object. That event is appended to the Framework's priority event queue. The awaiting_call_return flag is set, and visit_Call() immediately returns. That flag causes an immediate return from each of the visit_<syntax_element>() functions all the way up the call stack, and the instance of start_recipe() or resume_recipe() at the top of the call stack also returns. The Framework is then free to run the next queued event, which is the primitive named in node being processed by this visit_Call(). When the primitive has been run by the framework, the next primitive will be "resume_recipe", which will set the returning_from_call flag and start traversing the previously saved AST tree from the top again. Because of the various kpf_completed attributes set on nodes of the tree, processing will quickly get back to here, with the state of the various tree nodes the same as before. resume_recipe() will have placed the output of the primitive, which is an Arguments class object, on the class instance's call_output property and set the returning_from_call flag. visit_Call() will extract each positional argument value from the Arguments object and push it onto the _load stack, becoming the results of the call. (Any keyword arguments in the Arguments object returned from the primitive are ignored.) """ if self._reset_visited_states: setattr(node, 'kpf_completed', False) for arg in node.args: self.visit(arg) for kw in node.keywords: self.visit(kw) return self.pipeline.logger.debug(f"Call: {node.func.id} on recipe line {node.lineno}; kpf_completed is {getattr(node, 'kpf_completed', False)}") if node.func.id == 'invoke_subrecipe': subrecipe = getattr(node, '_kpf_subrecipe', None) if not subrecipe: self.pipeline.logger.debug(f"invoke_subrecipe: opening and parsing recipe file {node.args[0].s}") # TODO: do some argument checking here with open(node.args[0].s) as f: fstr = f.read() subrecipe = parse(fstr) node._kpf_subrecipe = subrecipe else: self.pipeline.logger.debug(f"invoke_subrecipe: found existing subrecipe of type {type(subrecipe)}") saved_depth = self.subrecipe_depth self.subrecipe_depth = self.subrecipe_depth + 1 self.visit(subrecipe) self.subrecipe_depth = saved_depth if self.awaiting_call_return: return elif not getattr(node, 'kpf_completed', False): if not self.returning_from_call: # SPECIAL CASE: allow print() with zero args # If the callee is our recipe print and there are no # positional arguments, inject a constant "" so the # one-arg arity check passes. if node.func.id == "print" and len(node.args) == 0: node.args.append(_ast.Constant(value="")) # ---------------------------------------------------- # Build and queue up the called function and arguments # as a pipeline event. # The "next_event" item in the event_table, populated # by visit_ImportFrom, will ensure that the recipe # processing will continue by making resume_recipe # the next scheduled event primative. # add keyword arguments kwargs = {} for kwnode in node.keywords: self.visit(kwnode) tup = self._load.pop() kwargs[tup[0]] = tup[1] if node.func.id in self._builtins.keys(): # directly handle one of the registered built-in functions and push # the results on the _load stack func, nargs = self._builtins[node.func.id] if len(node.args) != nargs: self.pipeline.logger.error(f"Call to {node.func.id} takes exactly {nargs} args, got {len(node.args)} on recipe line {node.lineno}") raise RecipeError(f"Call to {node.func.id} takes exactly one arg, got {len(node.args)} on recipe line {node.lineno}") arglist = [] for ix in range(nargs-1, -1, -1): # down through range because _load is a LIFO stack self.visit(node.args[ix]) arglist.append(self._load.pop()) results = func(*arglist, **kwargs) if isinstance(results, tuple): self.pipeline.logger.debug(f"Call (builtin): returned tuple, unpacking") for item in results: self.pipeline.logger.debug(f"Call (builtin): appending {item} of type {type(item)} to _load") self._load.append(item) else: self.pipeline.logger.debug(f"Call (builtin): appending {results} of type {type(results)} to _load") self._load.append(results) else: # Prepare arguments for and enqueue a data processing primitive to be executed # by the Framework, set the self.awaiting_call_return flag, and return. event_args = Arguments(name=node.func.id+"_args", **kwargs) # add positional arguments for argnode in node.args: self.visit(argnode) event_args.append(self._load.pop()) try: self.context.push_event(node.func.id, event_args) except: self.context.append_event(node.func.id, event_args) self.pipeline.logger.info(f"Queued {node.func.id} with args {str(event_args)}; awaiting return.") # self.awaiting_call_return = True return else: # returning from a call (pipeline event): # Get any returned values, stored by resume_recipe() in self.call_output, # and push them on the _load stack for Assign (or whatever) to handle. self.pipeline.logger.debug(f"Call on recipe line {node.lineno} returned output {self.call_output}") if isinstance(self.call_output, Arguments): # got output that we can deal with, otherwise, ignore the returned value for ix in range(len(self.call_output)): self._load.append(self.call_output[ix]) self.call_output = None self.returning_from_call = False setattr(node, 'kpf_completed', True)
[docs] def visit_keyword(self, node): """ visit_keyword() implements the syntax of keyword arguments (as opposed to positional arguments) within function calls. It does so by generating tuples of (keyword, value), generating the value by walking the corresponding AST subtree. The resulting tuple is stored on the _load stack, replacing the keyword name item. """ if self._reset_visited_states: self.visit(node.value) return # let the value node put the value on the _load stack self.visit(node.value) val = self._load.pop() self.pipeline.logger.debug(f"keyword: {val}") self._load.append((node.arg, val))
[docs] def visit_If(self, node): """ visit_If() implements conditional execution triggered by an "if" or "if ... else" statement. Evaluate the test and visit one of the two branches, body or orelse. Note: The python "if" expression, e.g. x = a if <condition> else b, is not supported in recipes. """ if self._reset_visited_states: setattr(node, 'kpf_completed', False) setattr(node, 'kpf_completed_test', False) if hasattr(node, 'kpf_boolResult'): delattr(node, 'kpf_boolResult') self.visit(node.test) for item in node.body: self.visit(item) for item in node.orelse: self.visit(item) return if not getattr(node, 'kpf_completed', False): if not getattr(node, 'kpf_completed_test', False): loadQSizeBefore = len(self._load) self.visit(node.test) if len(self._load) <= loadQSizeBefore: raise RecipeError( f"visit_If: on recipe line {node.lineno}, test didn't push a result on the _load stack") boolResult = self._load.pop() self.pipeline.logger.info(f"If condition on recipe line {node.lineno} was {boolResult}") setattr(node, 'kpf_boolResult', boolResult) setattr(node, 'kpf_completed_test', True) else: boolResult = getattr(node, 'kpf_boolResult') if boolResult: self.pipeline.logger.debug( f"If on recipe line {node.lineno} pushing and visiting Ifso") for item in node.body: self.visit(item) if self.awaiting_call_return: return else: self.pipeline.logger.debug( f"If on recipe line {node.lineno} pushing and visiting Else") for item in node.orelse: self.visit(item) if self.awaiting_call_return: return setattr(node, 'kpf_completed', True)
[docs] def visit_List(self, node): """ visit_List() implements the "[<list item>, <list item>, ...]" list syntax from Python. It does this by visiting the tree node representing each list element in turn. Visiting each node results in a new item pushed onto the _load stack. After visiting each subtree, the new items on the _load stack are popped and appended onto a list object. Finally, the list object is pushed onto the _load stack. """ if self._reset_visited_states: setattr(node, 'kpf_completed', False) for elt in node.elts: self.visit(elt) return self.pipeline.logger.debug(f"List") if not getattr(node, "kpf_completed", False): l = [] loadDepth = len(self._load) for elt in node.elts: self.visit(elt) if len(self._load) > loadDepth: l.append(self._load.pop()) else: raise RecipeError("List: expected item to append to list, but none was found") self._load.append(l) setattr(node, "kpf_completed", True)
[docs] def visit_Tuple(self, node): """ visit_Tuple() implements the "(<tuple item>, <tuple item>, ...)" tuple syntax from Python. It does this by visiting the tree node representing each tuple element in turn. Visiting each node results in a new item pushed onto the _load stack. After visiting each subtree, the new items on the _load stack are popped and appended onto a list object. Finally, the list object is pushed onto the _load stack. Internally, the tuple is generated by calling visit_List() to build a list on the _load stack, and then converting it into a tuple. """ self.pipeline.logger.debug(f"Tuple") if self._reset_visited_states: setattr(node, 'kpf_completed', False) for elt in node.elts: self.visit(elt) return if not getattr(node, "kpf_completed", False): if isinstance(node.ctx, _ast.Store): for elt in node.elts: self.visit(elt) elif isinstance(node.ctx, _ast.Load): self.visit_List(node) if not isinstance(self._load[len(self._load)-1], list): raise RecipeError("visit_Tuple() expected a list on the _load stack, " f"but got {self._load[len(self._load)-s]}") self._load.append(tuple(self._load.pop())) setattr(node, "kpf_completed", True) else: raise RecipeError( f"visit_Tuple: on recipe line {node.lineno}, ctx is unexpected type: {type(node.ctx)}")
[docs] def visit_NameConstant(self, node): """ visit_NameConstant() implements python NameConstant syntax element by pushing the value on the _load stack. """ if self._reset_visited_states: return self.pipeline.logger.debug(f"NameConstant: {node.value}") #ctx of NameConstant is always Load self._load.append(node.value)
[docs] def visit_Num(self, node): """ visit_Num() implements a numeric constant by pushing it on the _load stack. """ if self._reset_visited_states: return self.pipeline.logger.debug(f"Num: {node.n}") # ctx of Num is always Load self._load.append(node.n)
[docs] def visit_Str(self, node): """ visit_Str() implements a string constant by pushing its value on the _load stack. Multiline strings delimited by three double-quotes will result in string values with embedded line breaks. Multiple quoted strings with no separator character e.g. comma, are not automatically concatenated, as would be in Python. """ if self._reset_visited_states: return self.pipeline.logger.debug(f"Str: {node.s}") # ctx of Str is always Load self._load.append(node.s)
# ----------------------------------------------------------------- # F-string (JoinedStr) support # -----------------------------------------------------------------
[docs] def visit_JoinedStr(self, node): """ Evaluate a Python f-string (ast.JoinedStr) and push the resulting string onto the _load stack so that built-ins like print() can use it. Only simple `{expr}` interpolation is supported; conversion flags (!s, !r, !a) and format specs (':…') are ignored. That is enough for recipe logging. """ # -------- Reset-state pass (e.g. inside loops) -------- if self._reset_visited_states: for part in node.values: # Walk sub-expressions so their states reset, too if isinstance(part, _ast.FormattedValue): self.visit(part.value) return # -------- Normal execution pass -------- pieces = [] for part in node.values: if isinstance(part, _ast.Constant): # literal text pieces.append(part.value) elif isinstance(part, _ast.FormattedValue): # {expr} # Evaluate the expression; its value is now on _load self.visit(part.value) expr_val = self._load.pop() pieces.append(str(expr_val)) else: # something exotic we don't support raise RecipeError( f"Unsupported component in f-string: {type(part).__name__}" ) # Push the fully-assembled string back for the caller (e.g. print) self._load.append("".join(pieces))
[docs] def visit_Expr(self, node): """ visit_Expr() implements an expression by pushing the resulting value on the _load stack. Expression syntax elements are produced by the AST parser only rarely in recipe situations, for example when an expression stands alone, not as part of an assignment statement. """ if self._reset_visited_states: setattr(node, 'kpf_completed', False) self.visit(node.value) return if not getattr(node, 'kpf_completed', False): self.visit(node.value) if self.awaiting_call_return: return setattr(node, 'kpf_complted', True)
[docs] def visit_Attribute(self, node): """ visit_Attribute() implements the syntax e.g. of an object attribute access, e.g. "a.key". It does so by getting the name of the attribute, and then testing to see of the object at the top of the _load stack has an attribute of that name. If it does, the value of the attribute replaces the original object at the top of the _load stack. If no such attribute exists on the object, an message is logged and RecipeError is raised. """ if self._reset_visited_states: return self.visit(node.value) obj = self._load.pop() if isinstance(node.ctx, _ast.Load): try: if 'getValue' in obj.__dir__(): value = obj.getValue(node.attr) else: value = obj[node.attr] # print(f"Attribute: value is {type(value)}: {value}") except (KeyError, AttributeError): self.pipeline.logger.error( f"Object {obj} on line {node.lineno} of recipe has no attribute {node.attr}.") raise RecipeError( f"Object {obj} on line {node.lineno} of recipe has no attribute {node.attr}.") self.pipeline.logger.debug(f"Name is loading {value} from {node.attr}") self._load.append(value) elif isinstance(node.ctx, _ast.Store): self.pipeline.logger.error( f"Assigning to dictionary attribute on line {node.lineno} not supported") raise RecipeError( f"Assigning to dictionary attribute on line {node.lineno} not supported")
[docs] def visit_Subscript(self, node): """ visit_Subscript() implements subscript syntax of the form a[i], where the subscript value is a single index. The behavior is to replace the object on the _load stack with the value of the indexed item. See also visit_Index(). Note: Slice subscripts of the form a[i:j] are not supported. """ if self._reset_visited_states: return if isinstance(node.ctx, _ast.Load): self.visit(node.value) value = self._load.pop() self.visit(node.slice) sliceName = self._load.pop() self._load.append(value[sliceName]) elif isinstance(node.ctx, _ast.Store): self.pipeline.logger.error( f"Assigning to subscript {node.sliceName} on recipe line {node.lineno} not supported") raise RecipeError( f"Assigning to subscript {node.sliceName} on recipe line {node.lineno} not supported")
[docs] def visit_Index(self, node): """ visit_Index() doesn't do anything special. It simply visits the subtree corresponding to the index value. That (or those) nodes will have the result of pushing some appropriate value onto the _load stack. visit_Index() doesn't need to alter that value in any way. Typically visit_Subscript() will use that value to perform the actual indexing operation. """ if self._reset_visited_states: return self.visit(node.value)
[docs] def generic_visit(self, node): """ generic_visit() is called if no explicit visitor function exists for a node. It logs a message in the pipeline logger noting that the recipe contained an unsupported syntax element, and then raises RecipeError with the same message. """ self.pipeline.logger.error( f"generic_visit: got unsupported node {node.__class__.__name__}") raise RecipeError( f"Unsupported language feature: {node.__class__.__name__}")
[docs] def reset_visited_states(self, node): """ reset_visited_states() walks the tree below the node where the call is made after setting the class member self._reset_visited_states. Each visit_<syntax_element>, when called with that flag set, should reset its state so that an enclosing loop can be properly executed again with new parameters. reset_visited_states() is called in visit_<> methods that control looping, such as visit_For() """ self._reset_visited_states = True self.awaiting_call_return = False self.returning_from_call = False self.call_output = None self.visit(node) self._load.clear() self._store.clear() self._reset_visited_states = False