Source code for apssh.yaml_loader

"""
the YamlLoader class - how to create objects in YAML
"""

from pathlib import Path

import yaml

from jinja2 import Template, DebugUndefined


from asynciojobs import Scheduler
import apssh
from apssh import SshJob, formatters # , Run, RunScript, RunString, Push, Pull
from apssh.nodes import SshNode, LocalNode


WARNING = """
# WARNING: this file was produced automatically - DO NOT EDIT !
# see {original} instead
#
"""

# for error management, provide more context in the exception
def build_instance(cls, *args, **kwds):
    try:
        return cls(*args, **kwds)
    except TypeError as exc:
        args = [
            f"when trying to build instance of {cls.__name__}",
            f"with {args=}",
            f"and {kwds=}"
        ]
        raise TypeError(*args, *exc.args)

[docs] class YamlLoader: """ The `YamlLoader` class builds a Scheduler object from a yaml file In addition to using the regular YAML syntax (current implementation uses pyyaml, which supports YAML v1.1) the input can optionnally pass through Jinja2 templating; to that end, call the ``load*`` methods with a non-empty ``env`` parameter, that will specify templating variables Parameters: filename(str): the input file (can be a Path as well) Example: a simple example can be found `in github repo fit-r2lab/demos <https://github.com/fit-r2lab/r2lab-demos/tree/master/my-first-nepi-ng-script>`_ where the same script is written * `once in demo-v1.py using pure Python <https://github.com/fit-r2lab/r2lab-demos/blob/master/my-first-nepi-ng-script/demo-v1.py>`_ * and `a second time in demo-v2.py <https://github.com/fit-r2lab/r2lab-demos/blob/master/my-first-nepi-ng-script/demo-v2.py>`_ using `this script definition in YAML <https://github.com/fit-r2lab/r2lab-demos/blob/master/my-first-nepi-ng-script/demo-v2.yaml.j2>`_ <https://github.com/fit-r2lab/r2lab-demos/tree/master/my-first-nepi-ng-script> """ def __init__(self, filename): self.path = Path(filename)
[docs] def load(self, env=None, *, save_intermediate=None) -> Scheduler: """ parse input filename and returns a `Scheduler` object; a shortcut to using `load_with_maps()` and trashing the intermediary maps same parameters as `load_with_maps` """ _nodes_map, _jobs_map, scheduler = self.load_with_maps( env, save_intermediate=save_intermediate) return scheduler
[docs] def load_with_maps(self, env=None, *, save_intermediate=None): """ parse input filename Parameters: env(dict): if not empty, a Jinja2 pass is performed on the input save_intermediate: defaults to None, meaning do nothing; if provided, this parameter means to save the output of the jinja templating phase, typically for debugging purposes; if set to `True`, the output filename is computed from the object's filename as provided at constructor-time; alternatively you may also pass a string, or a Path instance. If env is None, this parameter is ignored. Returns: a tuple containing: (*) nodes_map, a dictionary linking ids to SshNode instantes (*) jobs_map, a dictionary linking ids to Job instances (*) the resulting scheduler """ with self.path.open() as feed: yaml_input = feed.read() if env: template = Template(yaml_input, undefined=DebugUndefined) yaml_input = template.render(**env) yaml_input = WARNING.format(original=self.path) + yaml_input # save this intermediate form for debugging or documentation if save_intermediate is None or save_intermediate is False: pass elif isinstance(save_intermediate, (str, Path)): if isinstance(save_intermediate, str): save_intermediate = Path(save_intermediate) with save_intermediate.open('w') as writer: writer.write(yaml_input) print(f"save_intermediate: (over)wrote plain YAML {save_intermediate}") else: # compute a filename if self.path.suffix == ".j2": # simply remove the .j2 intermediate_path = self.path.with_suffix("") else: # add a .tmp intermediate_path = self.path.parent / (self.path.name + ".tmp") with intermediate_path.open('w') as writer: writer.write(yaml_input) print(f"save_intermediate: (over)wrote plain YAML {intermediate_path}") D = yaml.safe_load(yaml_input) # pylint: disable=invalid-name if 'nodes' not in D: raise ValueError(f"file {self.path} has no 'nodes' key") if 'jobs' not in D: raise ValueError(f"file {self.path} has no 'jobs' key") try: nodes_map = self._load_nodes(D['nodes']) jobs_map, scheduler = self._load_jobs(D['jobs'], nodes_map) return nodes_map, jobs_map, scheduler except (ValueError, TypeError) as exc: print("\n".join(exc.args)) print(f"FATAL - aborting with an empty scheduler") return {}, {}, Scheduler()
@staticmethod def _dict_to_class(D, cls, mandatories, optionals): # pylint: disable=invalid-name """ translate a dict to a class object the 'id' field is always mandatory mandatories and optionals are both expected to be a dictionary that maps fieldname to a transformer function that is applied to the corresponding value; if None, no transformation is performed mandatory values are passed to the constructor as far as optionals, the result is then assigned right into object.key returns a tuple (id, obj) """ assert 'id' in D # check for mandatories for key in mandatories: if key not in D: raise ValueError(f"dict {D} misses the `{key}` field") # check for unexpected keys for key in D: if key == 'id': continue if key not in mandatories and key not in optionals: raise ValueError(f"{D} contains unexpected key {key}") # build object from mandatories constructor_args = {} for key, transformer in mandatories.items(): value = D[key] if transformer: value = transformer(value) constructor_args[key] = value obj = build_instance(cls, **constructor_args) # assign optionals for key, transformer in optionals.items(): if key not in D: continue value = D[key] if transformer: value = transformer(value) setattr(obj, key, value) return (D['id'], obj) def _load_nodes(self, nodes_list): """ returns a dict: - keys are the ids defined in each node (mandatory) - values are SshNode objects """ nodes_map = {} def locate_node_from_id(node_id): return nodes_map[node_id] def locate_formatter(clsname): # formatters is the apssh.formatters module cls = getattr(formatters, clsname) return cls() node_mandatories = { 'hostname': None, } node_optionals = { # the gateway field refers to an id defined beforehand 'gateway': locate_node_from_id, 'username': None, 'critical': None, 'formatter': locate_formatter, 'verbose': None, } local_mandatories = { } local_optionals = { 'formatter': locate_formatter, 'verbose': None, } for node_dict in nodes_list: if 'localnode' in node_dict: del node_dict['localnode'] node_id, node = self._dict_to_class(node_dict, LocalNode, local_mandatories, local_optionals) else: node_id, node = self._dict_to_class(node_dict, SshNode, node_mandatories, node_optionals) nodes_map[node_id] = node return nodes_map def _load_jobs(self, jobs_list, nodes_map): jobs_map = {} def locate_node_from_id(node_id): return nodes_map[node_id] def locate_requirement(req_id_s): # search instance or instances of job in the current map if isinstance(req_id_s, str): return {jobs_map[req_id_s]} else: return {jobs_map[req_id] for req_id in req_id_s} def create_commands(commands_list): def get_and_delete_key(D, k, default=None): # pylint: disable=invalid-name if k in D: result = D[k] del D[k] return result if default is None: raise ValueError( f"builing a command from {D}\n" f"you need to define either 'command' or '{k}'") return default result = [] for command_dict in commands_list: # locate the class classname = command_dict['type'] del command_dict['type'] cls = getattr(apssh, classname) if 'Run' in classname: # the 'usual' shorthand is to simply define 'command' if 'command' in command_dict: argv = command_dict['command'].split() del command_dict['command'] command_instance = build_instance(cls, *argv, **command_dict) # however in some cases the 'split' approach may not work # as desired elif classname == 'Run': argv = get_and_delete_key(command_dict, 'argv') command_instance = build_instance(cls, *argv, **command_dict) elif classname == 'RunScript': text = get_and_delete_key(command_dict, 'local_script') args = get_and_delete_key(command_dict, 'args', []) command_instance = build_instance(cls, text, *args, **command_dict) elif classname == 'RunString': text = get_and_delete_key(command_dict, 'script_body') args = get_and_delete_key(command_dict, 'args', []) command_instance = build_instance(cls, text, *args, **command_dict) elif classname in ('Push, Pull'): command_instance = build_instance(cls, **command_dict) result.append(command_instance) return result def strip_label(label): return label.strip() mandatories = { 'node': locate_node_from_id, 'commands': create_commands, } optionals = { 'required': locate_requirement, 'critical': None, 'verbose': None, 'label': strip_label, } for job_dict in jobs_list: job_id, job = self._dict_to_class(job_dict, SshJob, mandatories, optionals) jobs_map[job_id] = job scheduler = Scheduler() # insert jobs in scheduler scheduler.update(jobs_map.values()) return jobs_map, scheduler