saltstack/salt

View on GitHub
salt/runners/jobs.py

Summary

Maintainability
F
4 days
Test Coverage
# -*- coding: utf-8 -*-
'''
A convenience system to manage jobs, both active and already run
'''

# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import fnmatch
import logging
import os

# Import salt libs
import salt.client
import salt.payload
import salt.utils.args
import salt.utils.files
import salt.utils.jid
import salt.minion
import salt.returners

# Import 3rd-party libs
from salt.ext import six
from salt.exceptions import SaltClientError

try:
    import dateutil.parser as dateutil_parser
    DATEUTIL_SUPPORT = True
except ImportError:
    DATEUTIL_SUPPORT = False

log = logging.getLogger(__name__)


def active(display_progress=False):
    '''
    Return a report on all actively running jobs from a job id centric
    perspective

    CLI Example:

    .. code-block:: bash

        salt-run jobs.active
    '''
    ret = {}
    client = salt.client.get_local_client(__opts__['conf_file'])
    try:
        active_ = client.cmd('*', 'saltutil.running', timeout=__opts__['timeout'])
    except SaltClientError as client_error:
        print(client_error)
        return ret

    if display_progress:
        __jid_event__.fire_event({
            'message': 'Attempting to contact minions: {0}'.format(list(active_.keys()))
            }, 'progress')
    for minion, data in six.iteritems(active_):
        if display_progress:
            __jid_event__.fire_event({'message': 'Received reply from minion {0}'.format(minion)}, 'progress')
        if not isinstance(data, list):
            continue
        for job in data:
            if not job['jid'] in ret:
                ret[job['jid']] = _format_jid_instance(job['jid'], job)
                ret[job['jid']].update({'Running': [{minion: job.get('pid', None)}], 'Returned': []})
            else:
                ret[job['jid']]['Running'].append({minion: job['pid']})

    mminion = salt.minion.MasterMinion(__opts__)
    for jid in ret:
        returner = _get_returner((__opts__['ext_job_cache'], __opts__['master_job_cache']))
        data = mminion.returners['{0}.get_jid'.format(returner)](jid)
        if data:
            for minion in data:
                if minion not in ret[jid]['Returned']:
                    ret[jid]['Returned'].append(minion)

    return ret


def lookup_jid(jid,
               ext_source=None,
               returned=True,
               missing=False,
               display_progress=False):
    '''
    Return the printout from a previously executed job

    jid
        The jid to look up.

    ext_source
        The external job cache to use. Default: `None`.

    returned : True
        If ``True``, include the minions that did return from the command.

        .. versionadded:: 2015.8.0

    missing : False
        If ``True``, include the minions that did *not* return from the
        command.

    display_progress : False
        If ``True``, fire progress events.

        .. versionadded:: 2015.5.0

    CLI Example:

    .. code-block:: bash

        salt-run jobs.lookup_jid 20130916125524463507
        salt-run jobs.lookup_jid 20130916125524463507 --out=highstate
    '''
    ret = {}
    mminion = salt.minion.MasterMinion(__opts__)
    returner = _get_returner((
        __opts__['ext_job_cache'],
        ext_source,
        __opts__['master_job_cache']
    ))

    try:
        data = list_job(
            jid,
            ext_source=ext_source,
            display_progress=display_progress
        )
    except TypeError:
        return ('Requested returner could not be loaded. '
                'No JIDs could be retrieved.')

    targeted_minions = data.get('Minions', [])
    returns = data.get('Result', {})

    if returns:
        for minion in returns:
            if display_progress:
                __jid_event__.fire_event({'message': minion}, 'progress')
            if u'return' in returns[minion]:
                if returned:
                    ret[minion] = returns[minion].get(u'return')
            else:
                if returned:
                    ret[minion] = returns[minion].get('return')
    if missing:
        for minion_id in (x for x in targeted_minions if x not in returns):
            ret[minion_id] = 'Minion did not return'

    # We need to check to see if the 'out' key is present and use it to specify
    # the correct outputter, so we get highstate output for highstate runs.
    try:
        # Check if the return data has an 'out' key. We'll use that as the
        # outputter in the absence of one being passed on the CLI.
        outputter = None
        _ret = returns[next(iter(returns))]
        if 'out' in _ret:
            outputter = _ret['out']
        elif 'outputter' in _ret.get('return', {}).get('return', {}):
            outputter = _ret['return']['return']['outputter']
    except (StopIteration, AttributeError):
        pass

    if outputter:
        return {'outputter': outputter, 'data': ret}
    else:
        return ret


def list_job(jid, ext_source=None, display_progress=False):
    '''
    List a specific job given by its jid

    ext_source
        If provided, specifies which external job cache to use.

    display_progress : False
        If ``True``, fire progress events.

        .. versionadded:: 2015.8.8

    CLI Example:

    .. code-block:: bash

        salt-run jobs.list_job 20130916125524463507
        salt-run jobs.list_job 20130916125524463507 --out=pprint
    '''
    ret = {'jid': jid}
    mminion = salt.minion.MasterMinion(__opts__)
    returner = _get_returner((
        __opts__['ext_job_cache'],
        ext_source,
        __opts__['master_job_cache']
    ))
    if display_progress:
        __jid_event__.fire_event(
            {'message': 'Querying returner: {0}'.format(returner)},
            'progress'
        )

    job = mminion.returners['{0}.get_load'.format(returner)](jid)
    ret.update(_format_jid_instance(jid, job))
    ret['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid)

    fstr = '{0}.get_endtime'.format(__opts__['master_job_cache'])
    if (__opts__.get('job_cache_store_endtime')
            and fstr in mminion.returners):
        endtime = mminion.returners[fstr](jid)
        if endtime:
            ret['EndTime'] = endtime

    return ret


def list_jobs(ext_source=None,
              outputter=None,
              search_metadata=None,
              search_function=None,
              search_target=None,
              start_time=None,
              end_time=None,
              display_progress=False):
    '''
    List all detectable jobs and associated functions

    ext_source
        If provided, specifies which external job cache to use.

    **FILTER OPTIONS**

    .. note::
        If more than one of the below options are used, only jobs which match
        *all* of the filters will be returned.

    search_metadata
        Specify a dictionary to match to the job's metadata. If any of the
        key-value pairs in this dictionary match, the job will be returned.
        Example:

        .. code-block:: bash

            salt-run jobs.list_jobs search_metadata='{"foo": "bar", "baz": "qux"}'

    search_function
        Can be passed as a string or a list. Returns jobs which match the
        specified function. Globbing is allowed. Example:

        .. code-block:: bash

            salt-run jobs.list_jobs search_function='test.*'
            salt-run jobs.list_jobs search_function='["test.*", "pkg.install"]'

        .. versionchanged:: 2015.8.8
            Multiple targets can now also be passed as a comma-separated list.
            For example:

            .. code-block:: bash

                salt-run jobs.list_jobs search_function='test.*,pkg.install'

    search_target
        Can be passed as a string or a list. Returns jobs which match the
        specified minion name. Globbing is allowed. Example:

        .. code-block:: bash

            salt-run jobs.list_jobs search_target='*.mydomain.tld'
            salt-run jobs.list_jobs search_target='["db*", "myminion"]'

        .. versionchanged:: 2015.8.8
            Multiple targets can now also be passed as a comma-separated list.
            For example:

            .. code-block:: bash

                salt-run jobs.list_jobs search_target='db*,myminion'

    start_time
        Accepts any timestamp supported by the dateutil_ Python module (if this
        module is not installed, this argument will be ignored). Returns jobs
        which started after this timestamp.

    end_time
        Accepts any timestamp supported by the dateutil_ Python module (if this
        module is not installed, this argument will be ignored). Returns jobs
        which started before this timestamp.

    .. _dateutil: https://pypi.python.org/pypi/python-dateutil

    CLI Example:

    .. code-block:: bash

        salt-run jobs.list_jobs
        salt-run jobs.list_jobs search_function='test.*' search_target='localhost' search_metadata='{"bar": "foo"}'
        salt-run jobs.list_jobs start_time='2015, Mar 16 19:00' end_time='2015, Mar 18 22:00'

    '''
    returner = _get_returner((
        __opts__['ext_job_cache'],
        ext_source,
        __opts__['master_job_cache']
    ))
    if display_progress:
        __jid_event__.fire_event(
            {'message': 'Querying returner {0} for jobs.'.format(returner)},
            'progress'
        )
    mminion = salt.minion.MasterMinion(__opts__)

    ret = mminion.returners['{0}.get_jids'.format(returner)]()

    mret = {}
    for item in ret:
        _match = True
        if search_metadata:
            _match = False
            if 'Metadata' in ret[item]:
                if isinstance(search_metadata, dict):
                    for key in search_metadata:
                        if key in ret[item]['Metadata']:
                            if ret[item]['Metadata'][key] == search_metadata[key]:
                                _match = True
                else:
                    log.info('The search_metadata parameter must be specified'
                             ' as a dictionary.  Ignoring.')
        if search_target and _match:
            _match = False
            if 'Target' in ret[item]:
                targets = ret[item]['Target']
                if isinstance(targets, six.string_types):
                    targets = [targets]
                for target in targets:
                    for key in salt.utils.args.split_input(search_target):
                        if fnmatch.fnmatch(target, key):
                            _match = True

        if search_function and _match:
            _match = False
            if 'Function' in ret[item]:
                for key in salt.utils.args.split_input(search_function):
                    if fnmatch.fnmatch(ret[item]['Function'], key):
                        _match = True

        if start_time and _match:
            _match = False
            if DATEUTIL_SUPPORT:
                parsed_start_time = dateutil_parser.parse(start_time)
                _start_time = dateutil_parser.parse(ret[item]['StartTime'])
                if _start_time >= parsed_start_time:
                    _match = True
            else:
                log.error(
                    '\'dateutil\' library not available, skipping start_time '
                    'comparison.'
                )

        if end_time and _match:
            _match = False
            if DATEUTIL_SUPPORT:
                parsed_end_time = dateutil_parser.parse(end_time)
                _start_time = dateutil_parser.parse(ret[item]['StartTime'])
                if _start_time <= parsed_end_time:
                    _match = True
            else:
                log.error(
                    '\'dateutil\' library not available, skipping end_time '
                    'comparison.'
                )

        if _match:
            mret[item] = ret[item]

    if outputter:
        return {'outputter': outputter, 'data': mret}
    else:
        return mret


def list_jobs_filter(count,
                     filter_find_job=True,
                     ext_source=None,
                     outputter=None,
                     display_progress=False):
    '''
    List all detectable jobs and associated functions

    ext_source
        The external job cache to use. Default: `None`.

    CLI Example:

    .. code-block:: bash

        salt-run jobs.list_jobs_filter 50
        salt-run jobs.list_jobs_filter 100 filter_find_job=False

    '''
    returner = _get_returner((
        __opts__['ext_job_cache'],
        ext_source,
        __opts__['master_job_cache']
    ))
    if display_progress:
        __jid_event__.fire_event(
            {'message': 'Querying returner {0} for jobs.'.format(returner)},
            'progress'
        )
    mminion = salt.minion.MasterMinion(__opts__)

    fun = '{0}.get_jids_filter'.format(returner)
    if fun not in mminion.returners:
        raise NotImplementedError(
            '\'{0}\' returner function not implemented yet.'.format(fun)
        )
    ret = mminion.returners[fun](count, filter_find_job)

    if outputter:
        return {'outputter': outputter, 'data': ret}
    else:
        return ret


def print_job(jid, ext_source=None):
    '''
    Print a specific job's detail given by it's jid, including the return data.

    CLI Example:

    .. code-block:: bash

        salt-run jobs.print_job 20130916125524463507
    '''
    ret = {}

    returner = _get_returner((
        __opts__['ext_job_cache'],
        ext_source,
        __opts__['master_job_cache']
    ))
    mminion = salt.minion.MasterMinion(__opts__)

    try:
        job = mminion.returners['{0}.get_load'.format(returner)](jid)
        ret[jid] = _format_jid_instance(jid, job)
    except TypeError:
        ret[jid]['Result'] = (
            'Requested returner {0} is not available. Jobs cannot be '
            'retrieved. Check master log for details.'.format(returner)
        )
        return ret
    ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid)

    fstr = '{0}.get_endtime'.format(__opts__['master_job_cache'])
    if (__opts__.get('job_cache_store_endtime')
            and fstr in mminion.returners):
        endtime = mminion.returners[fstr](jid)
        if endtime:
            ret[jid]['EndTime'] = endtime

    return ret


def exit_success(jid, ext_source=None):
    '''
    Check if a job has been executed and exit successfully

    jid
        The jid to look up.
    ext_source
        The external job cache to use. Default: `None`.

    CLI Example:

    .. code-block:: bash

        salt-run jobs.exit_success 20160520145827701627
    '''
    ret = dict()

    data = list_job(
        jid,
        ext_source=ext_source
    )

    minions = data.get('Minions', [])
    result = data.get('Result', {})

    for minion in minions:
        if minion in result and 'return' in result[minion]:
            ret[minion] = True if result[minion]['return'] else False
        else:
            ret[minion] = False

    for minion in result:
        if 'return' in result[minion] and result[minion]['return']:
            ret[minion] = True
    return ret


def last_run(ext_source=None,
             outputter=None,
             metadata=None,
             function=None,
             target=None,
             display_progress=False):
    '''
    .. versionadded:: 2015.8.0

    List all detectable jobs and associated functions

    CLI Example:

    .. code-block:: bash

        salt-run jobs.last_run
        salt-run jobs.last_run target=nodename
        salt-run jobs.last_run function='cmd.run'
        salt-run jobs.last_run metadata="{'foo': 'bar'}"
    '''

    if metadata:
        if not isinstance(metadata, dict):
            log.info('The metadata parameter must be specified as a dictionary')
            return False

    _all_jobs = list_jobs(ext_source=ext_source,
                          outputter=outputter,
                          search_metadata=metadata,
                          search_function=function,
                          search_target=target,
                          display_progress=display_progress)
    if _all_jobs:
        last_job = sorted(_all_jobs)[-1]
        return print_job(last_job, ext_source)
    else:
        return False


def _get_returner(returner_types):
    '''
    Helper to iterate over returner_types and pick the first one
    '''
    for returner in returner_types:
        if returner and returner is not None:
            return returner


def _format_job_instance(job):
    '''
    Helper to format a job instance
    '''
    if not job:
        ret = {'Error': 'Cannot contact returner or no job with this jid'}
        return ret

    ret = {'Function': job.get('fun', 'unknown-function'),
           'Arguments': list(job.get('arg', [])),
           # unlikely but safeguard from invalid returns
           'Target': job.get('tgt', 'unknown-target'),
           'Target-type': job.get('tgt_type', 'list'),
           'User': job.get('user', 'root')}

    if 'metadata' in job:
        ret['Metadata'] = job.get('metadata', {})
    else:
        if 'kwargs' in job:
            if 'metadata' in job['kwargs']:
                ret['Metadata'] = job['kwargs'].get('metadata', {})

    if 'Minions' in job:
        ret['Minions'] = job['Minions']
    return ret


def _format_jid_instance(jid, job):
    '''
    Helper to format jid instance
    '''
    ret = _format_job_instance(job)
    ret.update({'StartTime': salt.utils.jid.jid_to_time(jid)})
    return ret


def _walk_through(job_dir, display_progress=False):
    '''
    Walk through the job dir and return jobs
    '''
    serial = salt.payload.Serial(__opts__)

    for top in os.listdir(job_dir):
        t_path = os.path.join(job_dir, top)

        for final in os.listdir(t_path):
            load_path = os.path.join(t_path, final, '.load.p')
            with salt.utils.files.fopen(load_path, 'rb') as rfh:
                job = serial.load(rfh)

            if not os.path.isfile(load_path):
                continue

            with salt.utils.files.fopen(load_path, 'rb') as rfh:
                job = serial.load(rfh)
            jid = job['jid']
            if display_progress:
                __jid_event__.fire_event(
                    {'message': 'Found JID {0}'.format(jid)},
                    'progress'
                )
            yield jid, job, t_path, final