"""
The :class:`SshNode` and :class:`LocalNode` classes are designed
as companions to the :class:`~apssh.sshjob.SshJob` class, that need
a ``node`` attribute to describe on which node to run commands.
"""
import asyncio
import os
from subprocess import PIPE, DEVNULL
from pathlib import Path
from asyncssh import EXTENDED_DATA_STDERR
from .formatters import HostFormatter
from .sshproxy import SshProxy
from .keys import load_private_keys, load_agent_keys
[docs]
class LocalNode:
"""
For convenience and consistency, this class can be used
as the ``node`` attribute of a :class:`~apssh.sshjob.SshJob` object,
so as to define a set of commands to run locally.
Parameters:
formatter: a formatter instance, default to an instance of
``HostFormatter``;
verbose: if provided, passed to the formatter instance
Examples:
To create a job that runs 2 commands locally::
SshJob(node=LocalNode(),
commands = [
Run("cat /etc/motd"),
Run("sleep 10"),
])
.. note::
Not all command classes support running on a local node, essentially
this is only available for usual ``Run`` commands as of this writing.
"""
def __init__(self, formatter=None, verbose=None):
self.formatter = formatter or HostFormatter()
if verbose is not None:
self.formatter.verbose = verbose
# could be improved
self.hostname = "LOCALNODE"
# some users reported issues with this so
# given that it's really only for convenience
# let's do this best effort
try:
self.username = os.getlogin()
except Exception: # pylint: disable=w0703
self.username = "LOCALUSER"
# pylint: disable=c0111
def lines(self, bytes_chunk, datatype):
# encoding should probably not be hard-wired
str_chunk = bytes_chunk.decode("utf-8")
if str_chunk:
if str_chunk[-1] == "\n":
str_chunk = str_chunk[:-1]
for line in str_chunk.split("\n"):
self.formatter.line(line + "\n", datatype, self.hostname)
# from this clue here
# https://stackoverflow.com/questions/17190221/subprocess-popen\
# -cloning-stdout-and-stderr-both-to-terminal-and-variables/\
# 25960956#25960956
[docs]
async def read_and_display(self, stream, datatype):
"""
read (process stdout or stderr) stream line by line
until EOF and dispatch lines in formatter
- using self.lines(... datatype)
"""
while True:
line = await stream.readline()
if not line:
return
self.lines(line, datatype)
async def run(self, command, *, ignore_outputs=False, cwd=None):
# pass cwd= to create_subprocess_shell when cwd is provided
kwds = {}
if cwd is not None:
kwds['cwd'] = cwd
try:
if not ignore_outputs:
process = await asyncio.create_subprocess_shell(
command, stdout=PIPE, stderr=PIPE, **kwds)
# multiplex stdout and stderr on the terminal
_, _ = await asyncio.gather(
self.read_and_display(process.stdout, 0),
self.read_and_display(process.stderr, EXTENDED_DATA_STDERR))
retcod = await process.wait()
return retcod
else:
process = await asyncio.create_subprocess_shell(
command, stdout=DEVNULL, stderr=DEVNULL, **kwds)
# nothing to read
self.lines(f"IGNORING (ignore_outputs=True) with `{command}`".encode(),
EXTENDED_DATA_STDERR)
retcod = await process.wait()
print(f"retcod={retcod}")
return retcod
except Exception as exc: # pylint: disable=w0703
line = f"LocalNode: Could not run local command {command} - {exc}"
self.formatter.line(line, EXTENDED_DATA_STDERR, self.hostname)
async def close(self):
pass
# ========== SshNode == SshProxy
# use apssh's sshproxy mostly as-is, except for keys handling
# the thing is, this implementation relies on formatters
# which probably needs more work.
# in particular this works fine only with remote processes
# whose output is text-based but well,
# right now I'm in a rush and would want to see stuff running...
# it's mostly a matter of exposing a more meaningful name in this context
# might need a dedicated formatter at some point
[docs]
class SshNode(SshProxy):
"""
An instance of `SshNode` typically is needed to create a
:class:`apssh.sshjob.SshJob` instance, that defines a batch of commands
or file transfers to run in sequence on that node.
Examples:
A typical usage to create a job that runs 2 commands remotely::
remote_node = SshNode('remote.foo.com', username='tutu')
SshJob(node=remote_node,
commands = [
Run("cat /etc/motd"),
Run("sleep 10"),
])
This class is a very close specialization of the
:class:`~apssh.sshproxy.SshProxy` class.
The only difference are in the handling of default values
at build time.
Parameters:
hostname: remote node's hostname
username: defaults to ``root`` if unspecified, note that
:class:`~apssh.sshproxy.SshProxy`'s default is to use
the local username instead
keys: filenames for the private keys to use when authenticating;
the default policy implemented in this class is to first use the
keys currently loaded in the ssh agent. If none can be found this
way, `SshNode` will attempt to import the default ssh keys located
in ``~/.ssh/id_rsa`` and ``~/.ssh/id_dsa``.
kwds: passed along to the :class:`~apssh.sshproxy.SshProxy` class.
"""
def __init__(self, hostname, *, username=None, keys=None, **kwds):
if username is None:
username = "root"
if not keys:
keys = load_agent_keys()
if not keys:
keys = load_private_keys()
SshProxy.__init__(self, hostname, username=username, keys=keys, **kwds)
[docs]
def distance(self):
"""
Returns:
int: number of hops from the local node.
An instance without a `gateway` has a distance of 1.
Otherwise, it is deemed one hop further than its gateway.
"""
if not self.gateway: # pylint: disable=r1705
return 1
else:
return 1 + self.gateway.distance()