saltstack/salt

View on GitHub
salt/utils/timed_subprocess.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
'''
For running command line executables with a timeout
'''
from __future__ import absolute_import, print_function, unicode_literals

import shlex
import subprocess
import threading
import salt.exceptions
import salt.utils.data
from salt.ext import six


class TimedProc(object):
    '''
    Create a TimedProc object, calls subprocess.Popen with passed args and **kwargs
    '''
    def __init__(self, args, **kwargs):

        self.wait = not kwargs.pop('bg', False)
        self.stdin = kwargs.pop('stdin', None)
        self.with_communicate = kwargs.pop('with_communicate', self.wait)
        self.timeout = kwargs.pop('timeout', None)
        self.stdin_raw_newlines = kwargs.pop('stdin_raw_newlines', False)

        # If you're not willing to wait for the process
        # you can't define any stdin, stdout or stderr
        if not self.wait:
            self.stdin = kwargs['stdin'] = None
            self.with_communicate = False
        elif self.stdin is not None:
            if not self.stdin_raw_newlines:
                # Translate a newline submitted as '\n' on the CLI to an actual
                # newline character.
                self.stdin = salt.utils.stringutils.to_bytes(self.stdin.replace('\\n', '\n'))
            kwargs['stdin'] = subprocess.PIPE

        if not self.with_communicate:
            self.stdout = kwargs['stdout'] = None
            self.stderr = kwargs['stderr'] = None

        if self.timeout and not isinstance(self.timeout, (int, float)):
            raise salt.exceptions.TimedProcTimeoutError('Error: timeout {0} must be a number'.format(self.timeout))
        if kwargs.get('shell', False):
            args = salt.utils.data.decode(args, to_str=True)

        try:
            self.process = subprocess.Popen(args, **kwargs)
        except (AttributeError, TypeError):
            if not kwargs.get('shell', False):
                if not isinstance(args, (list, tuple)):
                    try:
                        args = shlex.split(args)
                    except AttributeError:
                        args = shlex.split(six.text_type(args))
                str_args = []
                for arg in args:
                    if not isinstance(arg, six.string_types):
                        str_args.append(six.text_type(arg))
                    else:
                        str_args.append(arg)
                args = str_args
            else:
                if not isinstance(args, (list, tuple, six.string_types)):
                    # Handle corner case where someone does a 'cmd.run 3'
                    args = six.text_type(args)
            # Ensure that environment variables are strings
            for key, val in six.iteritems(kwargs.get('env', {})):
                if not isinstance(val, six.string_types):
                    kwargs['env'][key] = six.text_type(val)
                if not isinstance(key, six.string_types):
                    kwargs['env'][six.text_type(key)] = kwargs['env'].pop(key)
            if six.PY2 and 'env' in kwargs:
                # Ensure no unicode in custom env dict, as it can cause
                # problems with subprocess.
                kwargs['env'] = salt.utils.data.encode_dict(kwargs['env'])
            args = salt.utils.data.decode(args)
            self.process = subprocess.Popen(args, **kwargs)
        self.command = args

    def run(self):
        '''
        wait for subprocess to terminate and return subprocess' return code.
        If timeout is reached, throw TimedProcTimeoutError
        '''
        def receive():
            if self.with_communicate:
                self.stdout, self.stderr = self.process.communicate(input=self.stdin)
            elif self.wait:
                self.process.wait()

        if not self.timeout:
            receive()
        else:
            rt = threading.Thread(target=receive)
            rt.start()
            rt.join(self.timeout)
            if rt.isAlive():
                # Subprocess cleanup (best effort)
                self.process.kill()

                def terminate():
                    if rt.isAlive():
                        self.process.terminate()
                threading.Timer(10, terminate).start()
                raise salt.exceptions.TimedProcTimeoutError(
                    '{0} : Timed out after {1} seconds'.format(
                        self.command,
                        six.text_type(self.timeout),
                    )
                )
        return self.process.returncode