saltstack/salt

View on GitHub
salt/modules/dpkg_lowpkg.py

Summary

Maintainability
F
5 days
Test Coverage
# -*- coding: utf-8 -*-
'''
Support for DEB packages
'''
from __future__ import absolute_import, print_function, unicode_literals

# Import python libs
import logging
import os
import re
import datetime

# Import salt libs
import salt.utils.args
import salt.utils.data
import salt.utils.files
import salt.utils.path
import salt.utils.stringutils
from salt.exceptions import CommandExecutionError, SaltInvocationError

log = logging.getLogger(__name__)

# Define the module's virtual name
__virtualname__ = 'lowpkg'


def __virtual__():
    '''
    Confirm this module is on a Debian based system
    '''
    if __grains__.get('os_family') == 'Debian':
        return __virtualname__
    return (False, 'The dpkg execution module cannot be loaded: '
            'only works on Debian family systems.')


def bin_pkg_info(path, saltenv='base'):
    '''
    .. versionadded:: 2015.8.0

    Parses RPM metadata and returns a dictionary of information about the
    package (name, version, etc.).

    path
        Path to the file. Can either be an absolute path to a file on the
        minion, or a salt fileserver URL (e.g. ``salt://path/to/file.rpm``).
        If a salt fileserver URL is passed, the file will be cached to the
        minion so that it can be examined.

    saltenv : base
        Salt fileserver envrionment from which to retrieve the package. Ignored
        if ``path`` is a local file path on the minion.

    CLI Example:

    .. code-block:: bash

        salt '*' lowpkg.bin_pkg_info /root/foo-1.2.3-1ubuntu1_all.deb
        salt '*' lowpkg.bin_pkg_info salt://foo-1.2.3-1ubuntu1_all.deb
    '''
    # If the path is a valid protocol, pull it down using cp.cache_file
    if __salt__['config.valid_fileproto'](path):
        newpath = __salt__['cp.cache_file'](path, saltenv)
        if not newpath:
            raise CommandExecutionError(
                'Unable to retrieve {0} from saltenv \'{1}\''
                .format(path, saltenv)
            )
        path = newpath
    else:
        if not os.path.exists(path):
            raise CommandExecutionError(
                '{0} does not exist on minion'.format(path)
            )
        elif not os.path.isabs(path):
            raise SaltInvocationError(
                '{0} does not exist on minion'.format(path)
            )

    cmd = ['dpkg', '-I', path]
    result = __salt__['cmd.run_all'](cmd, output_loglevel='trace')
    if result['retcode'] != 0:
        msg = 'Unable to get info for ' + path
        if result['stderr']:
            msg += ': ' + result['stderr']
        raise CommandExecutionError(msg)

    ret = {}
    for line in result['stdout'].splitlines():
        line = line.strip()
        if line.startswith('Package:'):
            ret['name'] = line.split()[-1]
        elif line.startswith('Version:'):
            ret['version'] = line.split()[-1]
        elif line.startswith('Architecture:'):
            ret['arch'] = line.split()[-1]

    missing = [x for x in ('name', 'version', 'arch') if x not in ret]
    if missing:
        raise CommandExecutionError(
            'Unable to get {0} for {1}'.format(', '.join(missing), path)
        )

    if __grains__.get('cpuarch', '') == 'x86_64':
        osarch = __grains__.get('osarch', '')
        arch = ret['arch']
        if arch != 'all' and osarch == 'amd64' and osarch != arch:
            ret['name'] += ':{0}'.format(arch)

    return ret


def unpurge(*packages):
    '''
    Change package selection for each package specified to 'install'

    CLI Example:

    .. code-block:: bash

        salt '*' lowpkg.unpurge curl
    '''
    if not packages:
        return {}
    old = __salt__['pkg.list_pkgs'](purge_desired=True)
    ret = {}
    __salt__['cmd.run'](
        ['dpkg', '--set-selections'],
        stdin=r'\n'.join(['{0} install'.format(x) for x in packages]),
        python_shell=False,
        output_loglevel='trace'
    )
    __context__.pop('pkg.list_pkgs', None)
    new = __salt__['pkg.list_pkgs'](purge_desired=True)
    return salt.utils.data.compare_dicts(old, new)


def list_pkgs(*packages, **kwargs):
    '''
    List the packages currently installed in a dict::

        {'<package_name>': '<version>'}

    External dependencies::

        Virtual package resolution requires aptitude. Because this function
        uses dpkg, virtual packages will be reported as not installed.

    CLI Example:

    .. code-block:: bash

        salt '*' lowpkg.list_pkgs
        salt '*' lowpkg.list_pkgs httpd
    '''
    pkgs = {}
    cmd = 'dpkg -l {0}'.format(' '.join(packages))
    out = __salt__['cmd.run_all'](cmd, python_shell=False)
    if out['retcode'] != 0:
        msg = 'Error:  ' + out['stderr']
        log.error(msg)
        return msg
    out = out['stdout']

    for line in out.splitlines():
        if line.startswith('ii '):
            comps = line.split()
            pkgs[comps[1]] = comps[2]
    return pkgs


def file_list(*packages, **kwargs):
    '''
    List the files that belong to a package. Not specifying any packages will
    return a list of _every_ file on the system's package database (not
    generally recommended).

    CLI Examples:

    .. code-block:: bash

        salt '*' lowpkg.file_list httpd
        salt '*' lowpkg.file_list httpd postfix
        salt '*' lowpkg.file_list
    '''
    errors = []
    ret = set([])
    pkgs = {}
    cmd = 'dpkg -l {0}'.format(' '.join(packages))
    out = __salt__['cmd.run_all'](cmd, python_shell=False)
    if out['retcode'] != 0:
        msg = 'Error:  ' + out['stderr']
        log.error(msg)
        return msg
    out = out['stdout']

    for line in out.splitlines():
        if line.startswith('ii '):
            comps = line.split()
            pkgs[comps[1]] = {'version': comps[2],
                              'description': ' '.join(comps[3:])}
        if 'No packages found' in line:
            errors.append(line)
    for pkg in pkgs:
        files = []
        cmd = 'dpkg -L {0}'.format(pkg)
        for line in __salt__['cmd.run'](cmd, python_shell=False).splitlines():
            files.append(line)
        fileset = set(files)
        ret = ret.union(fileset)
    return {'errors': errors, 'files': list(ret)}


def file_dict(*packages, **kwargs):
    '''
    List the files that belong to a package, grouped by package. Not
    specifying any packages will return a list of _every_ file on the system's
    package database (not generally recommended).

    CLI Examples:

    .. code-block:: bash

        salt '*' lowpkg.file_list httpd
        salt '*' lowpkg.file_list httpd postfix
        salt '*' lowpkg.file_list
    '''
    errors = []
    ret = {}
    pkgs = {}
    cmd = 'dpkg -l {0}'.format(' '.join(packages))
    out = __salt__['cmd.run_all'](cmd, python_shell=False)
    if out['retcode'] != 0:
        msg = 'Error:  ' + out['stderr']
        log.error(msg)
        return msg
    out = out['stdout']

    for line in out.splitlines():
        if line.startswith('ii '):
            comps = line.split()
            pkgs[comps[1]] = {'version': comps[2],
                              'description': ' '.join(comps[3:])}
        if 'No packages found' in line:
            errors.append(line)
    for pkg in pkgs:
        files = []
        cmd = 'dpkg -L {0}'.format(pkg)
        for line in __salt__['cmd.run'](cmd, python_shell=False).splitlines():
            files.append(line)
        ret[pkg] = files
    return {'errors': errors, 'packages': ret}


def _get_pkg_build_time(name):
    '''
    Get package build time, if possible.

    :param name:
    :return:
    '''
    iso_time = iso_time_t = None
    changelog_dir = os.path.join('/usr/share/doc', name)
    if os.path.exists(changelog_dir):
        for fname in os.listdir(changelog_dir):
            try:
                iso_time_t = int(os.path.getmtime(os.path.join(changelog_dir, fname)))
                iso_time = datetime.datetime.utcfromtimestamp(iso_time_t).isoformat() + 'Z'
                break
            except OSError:
                pass

    # Packager doesn't care about Debian standards, therefore Plan B: brute-force it.
    if not iso_time:
        for pkg_f_path in __salt__['cmd.run']('dpkg-query -L {}'.format(name)).splitlines():
            if 'changelog' in pkg_f_path.lower() and os.path.exists(pkg_f_path):
                try:
                    iso_time_t = int(os.path.getmtime(pkg_f_path))
                    iso_time = datetime.datetime.utcfromtimestamp(iso_time_t).isoformat() + 'Z'
                    break
                except OSError:
                    pass

    return iso_time, iso_time_t


def _get_pkg_info(*packages, **kwargs):
    '''
    Return list of package information. If 'packages' parameter is empty,
    then data about all installed packages will be returned.

    :param packages: Specified packages.
    :param failhard: Throw an exception if no packages found.
    :return:
    '''
    kwargs = salt.utils.args.clean_kwargs(**kwargs)
    failhard = kwargs.pop('failhard', True)
    if kwargs:
        salt.utils.args.invalid_kwargs(kwargs)

    if __grains__['os'] == 'Ubuntu' and __grains__['osrelease_info'] < (12, 4):
        bin_var = '${binary}'
    else:
        bin_var = '${Package}'

    ret = []
    cmd = "dpkg-query -W -f='package:" + bin_var + "\\n" \
          "revision:${binary:Revision}\\n" \
          "arch:${Architecture}\\n" \
          "maintainer:${Maintainer}\\n" \
          "summary:${Summary}\\n" \
          "source:${source:Package}\\n" \
          "version:${Version}\\n" \
          "section:${Section}\\n" \
          "installed_size:${Installed-size}\\n" \
          "size:${Size}\\n" \
          "MD5:${MD5sum}\\n" \
          "SHA1:${SHA1}\\n" \
          "SHA256:${SHA256}\\n" \
          "origin:${Origin}\\n" \
          "homepage:${Homepage}\\n" \
          "status:${db:Status-Abbrev}\\n" \
          "======\\n" \
          "description:${Description}\\n" \
          "------\\n'"
    cmd += ' {0}'.format(' '.join(packages))
    cmd = cmd.strip()

    call = __salt__['cmd.run_all'](cmd, python_chell=False)
    if call['retcode']:
        if failhard:
            raise CommandExecutionError("Error getting packages information: {0}".format(call['stderr']))
        else:
            return ret

    for pkg_info in [elm for elm in re.split(r"------", call['stdout']) if elm.strip()]:
        pkg_data = {}
        pkg_info, pkg_descr = re.split(r"======", pkg_info)
        for pkg_info_line in [el.strip() for el in pkg_info.split(os.linesep) if el.strip()]:
            key, value = pkg_info_line.split(":", 1)
            if value:
                pkg_data[key] = value
        install_date, install_date_t = _get_pkg_install_time(pkg_data.get('package'), pkg_data.get('arch'))
        if install_date:
            pkg_data['install_date'] = install_date
            pkg_data['install_date_time_t'] = install_date_t  # Unix ticks
        build_date, build_date_t = _get_pkg_build_time(pkg_data.get('package'))
        if build_date:
            pkg_data['build_date'] = build_date
            pkg_data['build_date_time_t'] = build_date_t
        pkg_data['description'] = pkg_descr.split(":", 1)[-1]
        ret.append(pkg_data)

    return ret


def _get_pkg_license(pkg):
    '''
    Try to get a license from the package.
    Based on https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/

    :param pkg:
    :return:
    '''
    licenses = set()
    cpr = "/usr/share/doc/{0}/copyright".format(pkg)
    if os.path.exists(cpr):
        with salt.utils.files.fopen(cpr) as fp_:
            for line in salt.utils.stringutils.to_unicode(fp_.read()).split(os.linesep):
                if line.startswith("License:"):
                    licenses.add(line.split(":", 1)[1].strip())

    return ", ".join(sorted(licenses))


def _get_pkg_install_time(pkg, arch=None):
    '''
    Return package install time, based on the /var/lib/dpkg/info/<package>.list

    :return:
    '''
    iso_time = iso_time_t = None
    loc_root = '/var/lib/dpkg/info'
    if pkg is not None:
        locations = []
        if arch is not None and arch != 'all':
            locations.append(os.path.join(loc_root, '{0}:{1}.list'.format(pkg, arch)))

        locations.append(os.path.join(loc_root, '{0}.list'.format(pkg)))
        for location in locations:
            try:
                iso_time_t = int(os.path.getmtime(location))
                iso_time = datetime.datetime.utcfromtimestamp(iso_time_t).isoformat() + 'Z'
                break
            except OSError:
                pass

        if iso_time is None:
            log.debug('Unable to get package installation time for package "%s".', pkg)

    return iso_time, iso_time_t


def _get_pkg_ds_avail():
    '''
    Get the package information of the available packages, maintained by dselect.
    Note, this will be not very useful, if dselect isn't installed.

    :return:
    '''
    avail = "/var/lib/dpkg/available"
    if not salt.utils.path.which('dselect') or not os.path.exists(avail):
        return dict()

    # Do not update with dselect, just read what is.
    ret = dict()
    pkg_mrk = "Package:"
    pkg_name = "package"
    with salt.utils.files.fopen(avail) as fp_:
        for pkg_info in salt.utils.stringutils.to_unicode(fp_.read()).split(pkg_mrk):
            nfo = dict()
            for line in (pkg_mrk + pkg_info).split(os.linesep):
                line = line.split(": ", 1)
                if len(line) != 2:
                    continue
                key, value = line
                if value.strip():
                    nfo[key.lower()] = value
            if nfo.get(pkg_name):
                ret[nfo[pkg_name]] = nfo

    return ret


def info(*packages, **kwargs):
    '''
    Returns a detailed summary of package information for provided package names.
    If no packages are specified, all packages will be returned.

    .. versionadded:: 2015.8.1

    packages
        The names of the packages for which to return information.

    failhard
        Whether to throw an exception if none of the packages are installed.
        Defaults to True.

        .. versionadded:: 2016.11.3

    attr
        Comma-separated package attributes. If no 'attr' is specified, all available attributes returned.

        Valid attributes are:
            version, vendor, release, build_date, build_date_time_t, install_date, install_date_time_t,
            build_host, group, source_rpm, arch, epoch, size, license, signature, packager, url, summary, description.

        .. versionadded:: Neon

    CLI example:

    .. code-block:: bash

        salt '*' lowpkg.info
        salt '*' lowpkg.info apache2 bash
        salt '*' lowpkg.info 'php5*' failhard=false
    '''
    # Get the missing information from the /var/lib/dpkg/available, if it is there.
    # However, this file is operated by dselect which has to be installed.
    dselect_pkg_avail = _get_pkg_ds_avail()

    kwargs = salt.utils.args.clean_kwargs(**kwargs)
    failhard = kwargs.pop('failhard', True)
    attr = kwargs.pop('attr', None) or None
    if attr:
        attr = attr.split(',')

    if kwargs:
        salt.utils.args.invalid_kwargs(kwargs)

    ret = dict()
    for pkg in _get_pkg_info(*packages, failhard=failhard):
        # Merge extra information from the dselect, if available
        for pkg_ext_k, pkg_ext_v in dselect_pkg_avail.get(pkg['package'], {}).items():
            if pkg_ext_k not in pkg:
                pkg[pkg_ext_k] = pkg_ext_v
        # Remove "technical" keys
        for t_key in ['installed_size', 'depends', 'recommends',
                      'provides', 'replaces', 'conflicts', 'bugs',
                      'description-md5', 'task']:
            if t_key in pkg:
                del pkg[t_key]

        lic = _get_pkg_license(pkg['package'])
        if lic:
            pkg['license'] = lic

        # Remove keys that aren't in attrs
        pkg_name = pkg['package']
        if attr:
            for k in list(pkg.keys())[:]:
                if k not in attr:
                    del pkg[k]

        ret[pkg_name] = pkg

    return ret