krux/io.py
# Copyright 2013-2020 Salesforce.com, inc.
"""
This module provides tools for handling IO operations, like running
external commands.
Usage::
from krux.io import IO
io = IO()
cmd = io.run_cmd( command = 'echo 42' )
if cmd.ok:
....
else:
print cmd.stderr
"""
from __future__ import generator_stop
import re
import shlex
import signal
import subprocess
import krux.cli
import krux.logging
import krux.stats
from krux.util import hasmethod
RUN_COMMAND_EXCEPTION_EXIT_CODE = 256
class RunCmdError(Exception):
pass
class IORunCmd(object):
"""
Krux base class running command line programs
This should only ever be accessed via IO().run_cmd()
"""
def __init__(self, logger, stats):
self.___logger = logger
self.___stats = stats
# properties of the finished command:
self.command = None
self.returncode = 0
self.ok = True
self.stdout = []
self.stderr = []
self.exception = None
def run(self, command, filters=None, timeout=None, timeout_terminate_signal=signal.SIGTERM):
log = self.___logger
stats = self.___stats
# for bookkeeping purposes
self.command = command
log.debug('About to run: %s' % command)
# figure out if we've been passed a regex or a string for filtering
def is_regex(thing):
return hasattr(thing, 'pattern') and hasmethod(thing, 'search')
if filters:
filters = [is_regex(f) and f or re.compile(f) for f in filters]
else:
filters = []
log.debug('Applying output filters: %s' % [r.pattern for r in filters])
# if the command is a string, split it using shlex; which correctly handles quoted args like:
# cat "/var/log/foo bar.log" /var/log/baz\ .log
log.debug('command isinstance str: %s', isinstance(command, str))
if isinstance(command, str):
command = shlex.split(command)
try:
process = subprocess.Popen(
command,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=False
)
# Note that using communicate() buffers all output in memory and can
# hang if the buffer is filled.
try:
stdout, stderr = process.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
process.send_signal(timeout_terminate_signal)
stdout, stderr = process.communicate()
raise
# set the bookkeeping variables.
# the exit code is set on the process; communicate doesn't provide
# the exit code, just the outputs. So check it here. For details:
# https://docs.python.org/3.3/library/subprocess.html#subprocess.Popen.communicate
self.returncode = process.returncode
self.ok = not self.returncode
# print diagnostics if needed
mapping = {
# label log function output string return value
'Command output': [log.info, stdout, self.stdout],
'Command errors': [log.warning, stderr, self.stderr],
}
for label, outputs in list(mapping.items()):
# there was output
if len(outputs[1]) > 0:
for b in outputs[1].splitlines():
s = b.decode("utf-8")
ignore = False
for r in filters:
# but you wanted it filtered
if r.search(s):
log.debug('line "%s" matches "%s"' % (s, r.pattern))
ignore = True
# not filtered - store the filtered output in the object,
# so caller can inspect them
if not ignore:
outputs[2].append(s)
# print the entire output buffer
if len(outputs[2]) > 0:
outputs[0]('%s: %s' % (label, "\n".join(outputs[2])))
if not self.ok:
return False
# if we got here, everything's fine
return True
# The command failed
except Exception as err: # pylint: disable=broad-except
stats.incr('error.run_cmd')
log.critical('Command failed: %s', err)
# we're definitely in trouble, and don't know how
# far we've gotten, so just set them all again
self.exception = err
self.ok = False
# 0-255 is Normal Exitcodes
self.returncode = self.returncode or RUN_COMMAND_EXCEPTION_EXIT_CODE
return False
class IO(object):
"""
Krux base class for IO interactions
"""
def __init__(self, logger=None, stats=None):
"""
Wraps :py:class:`object` and provides IO routines
:keyword logger: The logger utility. Defaults to
:py:func:`logging.get_logger <krux.logging.get_logger>`
:keyword stats: The stats utility. Defaults to
:py:func:`cli.get_stats <krux.cli.get_stats>`
"""
# You shouldn't make us get the default ones, but instead use
# this via krux.cli which will have a fully instantiated object
self.__name = krux.cli.get_script_name()
self.logger = logger if logger else krux.logging.get_logger(name=self.__name)
self.stats = stats if stats else krux.stats.get_stats(prefix='io.%s' % self.__name)
def run_cmd(self, *args, raise_exception=False, **kwargs):
"""
Dispatches to :py:class:`IORunCmd` run() method. This is the
preferred way to run an external command.
:keyword command: The command to run, either as a list or as a string.
If the latter, it will be split on whitespace. Required argument.
:keyword filters: list of output filters to apply to stdout/stderr before
capturing or logging it. Filters can be strings or regular expressions.
:keyword raise_exception: If an error occurs, a :py:class:`RunCmdError`
exception will be thrown. Defaults to False.
"""
cmd = IORunCmd(logger=self.logger, stats=self.stats)
# true/false on success/failure
rv = cmd.run(*args, **kwargs)
# you want us to just throw exception?
if raise_exception and not rv:
raise RunCmdError(
"Failed command '%s' (%d): %s" %
(cmd.command, cmd.returncode, ' '.join(cmd.stderr))
)
# return the IORunCmd object, so it can be inspected
return cmd
def main():
"""
Quick testing routine when running stand alone
"""
logger = krux.logging.get_logger(name='io-test-app', level='debug')
io = IO(logger=logger)
cmd = io.run_cmd(command="false", raise_exception=False)
print(cmd.ok)
print(cmd.returncode)
print(cmd.stdout)
print(cmd.stderr)
if __name__ == '__main__':
main()