"""
The ``commands`` module implements all the command classes, typically
:class:`Run`, :class:`RunScript`, :class:`Pull`, and similar classes.
"""
from pathlib import Path
import random
import re
import copy
from asyncssh import EXTENDED_DATA_STDERR
from .formatters import CaptureFormatter
from .deferred import Capture
from .config import default_remote_workdir
####################
# The base class for items that make a SshJob's commands
[docs]
class AbstractCommand:
"""
Abstract base class for all command classes.
Parameters:
label: optional label used when representing a scheduler
textually or graphically
allowed_exits: the default is to only allow the command to exit(0).
Using ``allowed_exits``, one can whitelist a set of exit codes
or signals. If the command returns one of these codes, or receives
one of these signals, it is deemed to have completed successfully.
A retcod 0 is always allowed.
Examples:
``allowed_exits=["TERM", 4]`` would allow the command to either
return exit code ``4``, or to end after receiving signal `'TERM'`.
Refer to the POSIX documentation for signal names, like `QUIT`
or `ALRM`.
Note:
``allowed_exits`` is typically useful when a command starts a process
that is designed to be killed by another command later in the scheduler.
"""
def __init__(self, *, label=None,
allowed_exits=None):
"""
Some code
"""
# handle default, create empty set if needed
if allowed_exits is None:
allowed_exits = set()
else:
allowed_exits = set(allowed_exits)
# store local attributes
self.label = label
self.allowed_exits = allowed_exits
def __repr__(self):
return f"<{type(self).__name__}: {self.get_label_line()}>"
###
[docs]
async def co_run_remote(self, node):
"""
Needs to be redefined on actual command classes.
Returns:
Should return 0 if everything is fine.
"""
pass
[docs]
async def co_run_local(self, localnode):
"""
Needs to be redefined on actual command classes that want
to support running on a :class:`~apssh.nodes.LocalNode` as well.
Returns:
Should return 0 if everything is fine.
"""
pass
# extra messages go to stderr and are normally formatted
def _verbose_message(self, node, message):
if not hasattr(self, 'verbose') or not self.verbose: # pylint: disable=no-member
return
if not message.endswith("\n"):
message += "\n"
node.formatter.line(message, EXTENDED_DATA_STDERR, node.hostname)
# descriptive views, required by SshJob
def get_label_line(self): # pylint: disable=c0111
attempt = self.label
if attempt is not None:
return attempt
attempt = self.label_line() # pylint: disable=e1111
if attempt is not None:
return attempt
return f"NO-LABEL-LINE (class {type(self).__name__})"
[docs]
def label_line(self):
"""
Used by SshJob to conveniently show the inside of a Job;
intended to be redefined by daughter classes.
Returns:
str: a one-line string
"""
pass
[docs]
class CapturableMixin:
"""
this class implements the simple logic for capturing a command output
NOTE. it relies on the presence of the `self.node` attribute that points
back at the SshNode where this command is going to run;
which is set by the SshJob class
"""
def __init__(self, capture: Capture):
self.capture = capture
def start_capture(self): # pylint: disable=missing-function-docstring
if self.capture:
# store the node's formatter and set aside for later
self.previous_formatter = self.node.formatter
self.node.formatter = CaptureFormatter()
def end_capture(self): # pylint: disable=missing-function-docstring
if self.capture:
# get result from transient formatter
captured = self.node.formatter.get_capture()
# restore the node's formatter
self.node.formatter = self.previous_formatter
self.previous_formatter = None
# sanitize
if captured and captured[-1] == "\n":
captured = captured[:-1]
# store captured in self.capture
variables = self.capture.variables
varname = self.capture.varname
# print(f"in variables, {varname} assigned to {captured}")
variables[varname] = captured
[docs]
class StrLikeMixin:
"""
the various Run* classes need to look like a str object for some
operations, like minimally the following dunder methods
this is needed for the deferred operation mode, where command objects
need to remain as Deferred objects and not str, as that would imply early evaluation
"""
def __str__(self):
return self._remote_command()
def __add__(self, strlike):
result = copy.copy(self)
result.argv += str(strlike)
[docs]
class Run(AbstractCommand, CapturableMixin, StrLikeMixin):
"""
The most basic form of a command is to run a remote command
Parameters:
argv: the parts of the remote command.
The actual command run remotely is obtained by
concatenating the string representation of each argv
and separating them with a space.
label: if set, is used to describe the command in scheduler graphs.
verbose (bool): if set, the actual command being run is printed out.
x11 (bool): if set, will enable X11 forwarding, so that a X11 program
running remotely ends on the local DISPLAY.
ignore_outputs(bool): this flag is currently used only when running on a LocalNode();
in that case, the stdout and stderr of the forked process are bound to /dev/null,
and no attempt is made to read them; this has turned out a useful trick when
spawning port-forwarding ssh sessions
Examples:
Remotely run ``tail -n 1 /etc/lsb-release`` ::
Run("tail -n 1 /etc/lsb-release")
The following forms are exactly equivalent::
Run("tail", "-n", 1, "/etc/lsb-release")
Run("tail -n", 1, "/etc/lsb-release")
"""
# it was tempting to use x11_forwarding as the name here, but
# first it's definitely too long, given the usage of Run
# plus, maybe some day we'll need to add other keywords
# to create_connection than just x11_forwarding,
# so, it feels about right to call this just like x11
def __init__(self, *argv,
# proper
verbose=False, x11=False, ignore_outputs=False,
# AbstractCommand
label=None, allowed_exits=None,
# CapturableMixin
capture: Capture = None):
self.argv = argv
self.verbose = verbose
self.x11 = x11
self.ignore_outputs = ignore_outputs
AbstractCommand.__init__(self, label=label,
allowed_exits=allowed_exits)
CapturableMixin.__init__(self, capture)
[docs]
def label_line(self):
"""
One-line rendering is to use the ``label`` attribute if set,
by default it is the full remote command.
"""
return self._remote_command()
def _remote_command(self):
return " ".join(str(x) for x in self.argv)
[docs]
async def co_run_remote(self, node):
"""
The semantics of running on a remote node.
"""
self.start_capture()
command = self._remote_command()
self._verbose_message(node, f"Run: -> {command}")
# need an ssh connection
connected = await node.connect_lazy()
if not connected:
return
node_run = await node.run(command, x11_forwarding=self.x11)
self._verbose_message(
node, f"Run: {node_run} <- {command}")
self.end_capture()
return node_run
[docs]
async def co_run_local(self, localnode):
"""
The semantics of running on a local node.
"""
self.start_capture()
command = self._remote_command()
self._verbose_message(localnode, f"Run: -> {command}")
retcod = await localnode.run(command, ignore_outputs=self.ignore_outputs)
self._verbose_message(
localnode, f"Run: {retcod} <- {command}")
self.end_capture()
return retcod
# the base class for running a script provided locally
# same as Run, but the command to run remotely is provided
# as a local material, either a local file, or a python string
[docs]
class RunLocalStuff(AbstractCommand, CapturableMixin, StrLikeMixin):
"""
The base class for ``RunScript`` and ``RunString``.
This class implements the common logic for a local script that
needs to be copied over before being executed.
Parameters:
args: the argument list for the remote command
label: if set, is used to describe the command in scheduler graphs.
includes: a collection of local files that need to be copied over
as well; get copied in the same directory as the remote script.
verbose: print out more information if set; this additionnally causes
the remote script to be invoked through ``bash -x``, which admittedly
is totally hacky. xxx we need to remove this.
remote_basename: an optional name for the remote copy of the script.
Local commands are copied in a remote directory
- typically in ``~/.apssh-remote``.
Also, all copies are done under a name that contains a random string to
avoid collisions. This is because two parallel runs of the same command
would otherwise be at risk of one overwriting the remote command file,
while the second tries to run it, which causes errors like this::
fit26: .apssh-remote/B3.sh: /bin/bash: bad interpreter: Text file busy
"""
def __init__(self, args, *,
label=None, allowed_exits=None,
includes=None, remote_basename=None,
x11=False, verbose=False,
ignore_outputs=False,
capture: Capture=None):
self.args = args
self.includes = includes if includes is not None else []
self.remote_basename = remote_basename
self.x11 = x11
self.verbose = verbose
self.ignore_outputs = ignore_outputs
AbstractCommand.__init__(self, label=label,
allowed_exits=allowed_exits)
CapturableMixin.__init__(self, capture)
def __str__(self):
return self._remote_command()
@staticmethod
def _random_id():
"""
Generate a random string to avoid conflicting names
on the remote host
"""
return "".join(random.choice('abcdefghijklmnopqrstuvwxyz')
for i in range(8))
def _args_line(self):
return " ".join(str(x) for x in self.args)
def _remote_command(self): # pylint: disable=c0111
command = self.remote_basename + " " + self._args_line()
command = default_remote_workdir + "/" + command
if self.verbose:
command = "bash -x " + command
return command
[docs]
async def co_install(self, node, remote_path):
"""
Abstract method to explain how to remotely install
a local script before we can invoke it
"""
print(f"coroutine method co_install"
f" needs to be redefined on your RunLocalStuff subclass"
f" args are {node} and {remote_path}")
[docs]
async def co_run_remote(self, node):
"""
Implemented to satisfy the requirement of ``AbstractCommand``.
The common behaviour for both classes is to first invoke
:meth:`co_install()` to push the local material
over; it should raise an exception in case of failure.
"""
# we need the node to be connected by ssh and SFTP
# and we need the remote work dir to be created
if not (await node.sftp_connect_lazy()
and await node.mkdir(default_remote_workdir)):
# should never be here
return
remote_path = default_remote_workdir + "/" + self.remote_basename
# do the remote install - depending on the actual class
await self.co_install(node, remote_path)
# make sure the remote script is executable - chmod 755
permissions = 0o755
await node.sftp_client.chmod(remote_path, permissions)
if self.includes:
# sequential is good enough
for include in self.includes:
if not Path(include).exists():
print(f"include file {include} not found -- skipped")
continue
self._verbose_message(
node,
f"RunLocalStuff: pushing include {include} in {default_remote_workdir}")
if not await node.put_file_s(
include, default_remote_workdir + "/",
follow_symlinks=True):
return
# trigger it
self.start_capture()
command = self._remote_command()
self._verbose_message(node, f"RunLocalStuff: -> {command}")
node_run = await node.run(command, x11_forwarding=self.x11)
self._verbose_message(
node, f"RunLocalStuff: {node_run} <- {command}")
self.end_capture()
return node_run
# virtual method that needs to be implemented on each subclass
def _actual_contents(self) -> str:
print(f"_actual_contents needs to be redefined on {type(self)}")
pass
[docs]
async def co_run_local(self, localnode):
"""
the behaviour of these commands when run LocalNode instance
"""
# doing a local copy is mandatory anyway for RunString
# also this way we can do chmod +x on it
local_copy = Path.home() / default_remote_workdir / self.remote_basename
with local_copy.open('w') as writer:
writer.write(self._actual_contents())
# make executable
local_copy.chmod(0o700)
self.start_capture()
command = f"{Path.home()}/{self._remote_command()}"
self._verbose_message(localnode, f"Run: -> {command}")
retcod = await localnode.run(command, ignore_outputs=self.ignore_outputs)
print(f"{retcod=}")
self._verbose_message(localnode, f"Run: {retcod} <- {command}")
self.end_capture()
return retcod
# same but using a script that is available as a local file
[docs]
class RunScript(RunLocalStuff):
"""
A class to run a **local script file** on the remote system,
but with arguments passed exactly like with `Run`
Parameters:
local_script: the local filename for the script to run remotely
args: the arguments for the remote script; like with :class:`Run`,
these are joined with a space character
label: if set, is used to describe the command in scheduler graphs.
includes: a collection of local files to be copied over in the same
location as the remote script, i.e. typically in ``~/.apssh-remote``
x11 (bool): allows to enable X11 x11_forwarding
verbose: more output
Examples:
Run a local script located in ``../foo.sh`` with specified args::
RunScript("../foo.sh", "arg1", 2, "arg3")
or equivalently::
RunScript("../foo.sh", "arg1 2", "arg3")
"""
def __init__(self, local_script, *args,
label=None, allowed_exits=None,
includes=None, x11=False,
# if this is set, run bash -x
verbose=False,
capture: Capture=None):
self.local_script = local_script
self.local_basename = Path(local_script).name
remote_basename = self.local_basename + '-' + self._random_id()
super().__init__(args,
label=label,
allowed_exits=allowed_exits,
includes=includes,
remote_basename=remote_basename,
x11=x11, verbose=verbose, capture=capture)
[docs]
def label_line(self):
return "RunScript: " + self.local_basename + " " + self._args_line()
[docs]
async def co_install(self, node, remote_path):
if not Path(self.local_script).exists():
raise OSError(f"RunScript : {self.local_script} not found - bailing out")
if not await node.put_file_s(
self.local_script, remote_path,
follow_symlinks=True):
return
def _actual_contents(self) -> str:
with open(self.local_script) as feed:
return feed.read()
#####
[docs]
class RunString(RunLocalStuff):
"""
Much like RunScript, but the script to run remotely is expected
to be passed in the first argument as **a python string** this time.
Parameters:
script_body(str): the **contents** of the script to run remotely.
args: the arguments for the remote script; like with :class:`Run`,
these are joined with a space character
label: if set, is used to describe the command in scheduler graphs.
includes: a collection of local files to be copied over in the same
location as the remote script, i.e. typically in ``~/.apssh-remote``
x11 (bool): allows to enable X11 x11_forwarding
remote_name: if provided, will tell how the created script
should be named on the remote node; it is randomly generated
if not specified by caller.
verbose: more output
Examples:
Here's how to call a simple bash wrapper remotely::
myscript = "#!/bin/bash\\nfor arg in "$@"; do echo arg=$arg; done"
scheduler.add(
RunString(myscript, "foo", "bar", 2, "arg3",
remote_name = "echo-args.sh"))
"""
def __init__(self, script_body, *args,
label=None, allowed_exits=None,
includes=None, x11=False,
# the name under which the remote command will be installed
remote_name=None,
# if this is set, run bash -x
verbose=False,
capture: Capture=None):
self.script_body = script_body
if remote_name:
self.remote_name = remote_name
# just in case
remote_basename = Path(self.remote_name).name
remote_basename += '-' + self._random_id()
else:
self.remote_name = ''
remote_basename = self._random_id()
super().__init__(args,
label=label,
allowed_exits=allowed_exits,
includes=includes,
remote_basename=remote_basename,
x11=x11, verbose=verbose,
capture=capture)
@staticmethod
def _relevant_first_line(body):
blank = re.compile(r'\A\s*\Z')
comment = re.compile(r'\A\s*#')
for line in body.split("\n"):
if comment.match(line) or blank.match(line):
continue
return line.strip()
return "??? empty body ???"
WIDTH = 15
def _truncated(self, width=None):
if width is None:
width = self.WIDTH
body = self._relevant_first_line(self.script_body)
if len(body) <= width:
return body
# generate {:15.15s}...
truncate_format = "{{:{width}.{width}s}}...".format(width=width)
return truncate_format.format(body)
[docs]
def label_line(self):
return f"RunString: {self._truncated()} {self._args_line()}"
[docs]
async def co_install(self, node, remote_path):
self._verbose_message(
node, f"RunString: pushing script into {remote_path}")
if not await node.put_string_script(
self.script_body, remote_path):
return
def _actual_contents(self) -> str:
return self.script_body
####
[docs]
class Pull(AbstractCommand):
"""
Retrieve remote files and stores them locally
Parameters:
remotepaths: a collection of remote paths to be retrieved.
localpath: the local directory where to store resulting copies.
label: if set, is used to describe the command in scheduler graphs.
verbose (bool): be verbose.
kwds: passed as-is to the SFTPClient get method.
See also:
http://asyncssh.readthedocs.io/en/latest/api.html#asyncssh.SFTPClient.get
"""
def __init__(self, remotepaths, localpath,
*args,
label=None,
verbose=False,
# asyncssh's SFTP client get options
**kwds):
self.remotepaths = remotepaths
self.localpath = localpath
self.verbose = verbose
self.args = args
self.kwds = kwds
super().__init__(label=label)
def _remote_path(self):
paths = self.remotepaths
if isinstance(self.remotepaths, str):
paths = [paths]
if len(paths) == 1:
return paths[0]
else:
return f"{paths[0]} ... ({len(paths)} total)"
[docs]
def label_line(self):
return f"Pull: {self._remote_path()} into {self.localpath}"
[docs]
async def co_run_remote(self, node):
self._verbose_message(
node,
f"Pull: remotepaths={self.remotepaths}, localpath={self.localpath}")
await node.sftp_connect_lazy()
await node.get_file_s(self.remotepaths, self.localpath,
*self.args, **self.kwds)
self._verbose_message(node, "Pull done")
return 0
####
[docs]
class Push(AbstractCommand):
"""
Put local files onto target node
Parameters:
localpaths: a collection of local filenames to be copied
over to the remote end.
remotepath: the directory where to store copied on the remote end.
label: if set, is used to describe the command in scheduler graphs.
verbose (bool): be verbose.
kwds: passed as-is to the SFTPClient put method.
See also:
http://asyncssh.readthedocs.io/en/latest/api.html#asyncssh.SFTPClient.put
"""
def __init__(self, localpaths, remotepath,
*args,
label=None,
verbose=False,
**kwds):
self.localpaths = localpaths
self.remotepath = remotepath
self.verbose = verbose
self.args = args
self.kwds = kwds
super().__init__(label=label)
def _local_path(self):
paths = self.localpaths
if isinstance(self.localpaths, str):
paths = [paths]
if len(paths) == 1:
return paths[0]
else:
return f"{paths[0]} ... ({len(paths)} total)"
[docs]
def label_line(self):
return f"Push: {self._local_path()} onto {self.remotepath}"
[docs]
async def co_run_remote(self, node):
self._verbose_message(
node,
f"Push: localpaths={self.localpaths}, remotepath={self.remotepath}")
await node.sftp_connect_lazy()
await node.put_file_s(self.localpaths, self.remotepath,
*self.args, **self.kwds)
self._verbose_message(node, "Push done")
return 0