salt/modules/aptly.py
# -*- coding: utf-8 -*-
'''
Aptly Debian repository manager.
.. versionadded:: 2018.3.0
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import os
import re
# Import salt libs
from salt.ext import six
from salt.exceptions import SaltInvocationError
import salt.utils.json
import salt.utils.path
import salt.utils.stringutils
_DEFAULT_CONFIG_PATH = '/etc/aptly.conf'
log = logging.getLogger(__name__)
# Define the module's virtual name
__virtualname__ = 'aptly'
def __virtual__():
'''
Only works on systems with the aptly binary in the system path.
'''
if salt.utils.path.which('aptly'):
return __virtualname__
return (False, 'The aptly binaries required cannot be found or are not installed.')
def _cmd_run(cmd):
'''
Run the aptly command.
:return: The string output of the command.
:rtype: str
'''
cmd.insert(0, 'aptly')
cmd_ret = __salt__['cmd.run_all'](cmd, ignore_retcode=True)
if cmd_ret['retcode'] != 0:
log.debug('Unable to execute command: %s\nError: %s', cmd,
cmd_ret['stderr'])
return cmd_ret['stdout']
def _convert_to_closest_type(value):
'''
Convert string value to it's closest non-string analog, if possible.
:param dict value: The string value to attempt conversion on.
:return: The converted value.
:rtype: Union[bool, int, None, str]
'''
value = salt.utils.stringutils.to_bool(value.strip())
if isinstance(value, bool):
return value
return salt.utils.stringutils.to_none(salt.utils.stringutils.to_num(value))
def _format_repo_args(comment=None, component=None, distribution=None,
uploaders_file=None, saltenv='base'):
'''
Format the common arguments for creating or editing a repository.
:param str comment: The description of the repository.
:param str component: The default component to use when publishing.
:param str distribution: The default distribution to use when publishing.
:param str uploaders_file: The repository upload restrictions config.
:param str saltenv: The environment the file resides in.
:return: A list of the arguments formatted as aptly arguments.
:rtype: list
'''
ret = list()
cached_uploaders_path = None
settings = {'comment': comment, 'component': component,
'distribution': distribution}
if uploaders_file:
cached_uploaders_path = __salt__['cp.cache_file'](uploaders_file, saltenv)
if not cached_uploaders_path:
log.error('Unable to get cached copy of file: %s', uploaders_file)
return False
for setting in settings:
if settings[setting] is not None:
ret.append('-{}={}'.format(setting, settings[setting]))
if cached_uploaders_path:
ret.append('-uploaders-file={}'.format(cached_uploaders_path))
return ret
def _parse_show_output(cmd_ret):
'''
Parse the output of an aptly show command.
:param str cmd_ret: The text of the command output that needs to be parsed.
:return: A dictionary containing the configuration data.
:rtype: dict
'''
parsed_data = dict()
list_key = None
for line in cmd_ret.splitlines():
# Skip empty lines.
if not line.strip():
continue
# If there are indented lines below an existing key, assign them as a list value
# for that key.
if not salt.utils.stringutils.contains_whitespace(line[0]):
list_key = None
if list_key:
list_value = _convert_to_closest_type(line)
parsed_data.setdefault(list_key, []).append(list_value)
continue
items = [item.strip() for item in line.split(':', 1)]
key = items[0].lower()
key = ' '.join(key.split()).replace(' ', '_')
# Track the current key so that we can use it in instances where the values
# appear on subsequent lines of the output.
list_key = key
try:
value = items[1]
except IndexError:
# If the line doesn't have the separator or is otherwise invalid, skip it.
log.debug('Skipping line: %s', line)
continue
if value:
parsed_data[key] = _convert_to_closest_type(value)
return _convert_parsed_show_output(parsed_data=parsed_data)
def _convert_parsed_show_output(parsed_data):
'''
Convert matching string values to lists/dictionaries.
:param dict parsed_data: The text of the command output that needs to be parsed.
:return: A dictionary containing the modified configuration data.
:rtype: dict
'''
# Match lines like "main: xenial [snapshot]" or "test [local]".
source_pattern = re.compile(r'(?:(?P<component>\S+):)?\s*(?P<name>\S+)\s+\[(?P<type>\S+)\]')
sources = list()
if 'architectures' in parsed_data:
parsed_data['architectures'] = [item.strip() for item in parsed_data['architectures'].split()]
parsed_data['architectures'] = sorted(parsed_data['architectures'])
for source in parsed_data.get('sources', []):
# Retain the key/value of only the matching named groups.
matches = source_pattern.search(source)
if matches:
groups = matches.groupdict()
sources.append({key: groups[key] for key in groups if groups[key]})
if sources:
parsed_data['sources'] = sources
return parsed_data
def _validate_config(config_path):
'''
Validate that the configuration file exists and is readable.
:param str config_path: The path to the configuration file for the aptly instance.
:return: None
:rtype: None
'''
log.debug('Checking configuration file: %s', config_path)
if not os.path.isfile(config_path):
message = 'Unable to get configuration file: {}'.format(config_path)
log.error(message)
raise SaltInvocationError(message)
log.debug('Found configuration file: %s', config_path)
def get_config(config_path=_DEFAULT_CONFIG_PATH):
'''
Get the configuration data.
:param str config_path: The path to the configuration file for the aptly instance.
:return: A dictionary containing the configuration data.
:rtype: dict
CLI Example:
.. code-block:: bash
salt '*' aptly.get_config
'''
_validate_config(config_path)
cmd = ['config', 'show', '-config={}'.format(config_path)]
cmd_ret = _cmd_run(cmd)
return salt.utils.json.loads(cmd_ret)
def list_repos(config_path=_DEFAULT_CONFIG_PATH, with_packages=False):
'''
List all of the local package repositories.
:param str config_path: The path to the configuration file for the aptly instance.
:param bool with_packages: Return a list of packages in the repo.
:return: A dictionary of the repositories.
:rtype: dict
CLI Example:
.. code-block:: bash
salt '*' aptly.list_repos
'''
_validate_config(config_path)
ret = dict()
cmd = ['repo', 'list', '-config={}'.format(config_path), '-raw=true']
cmd_ret = _cmd_run(cmd)
repos = [line.strip() for line in cmd_ret.splitlines()]
log.debug('Found repositories: %s', len(repos))
for name in repos:
ret[name] = get_repo(name=name, config_path=config_path,
with_packages=with_packages)
return ret
def get_repo(name, config_path=_DEFAULT_CONFIG_PATH, with_packages=False):
'''
Get detailed information about a local package repository.
:param str name: The name of the local repository.
:param str config_path: The path to the configuration file for the aptly instance.
:param bool with_packages: Return a list of packages in the repo.
:return: A dictionary containing information about the repository.
:rtype: dict
CLI Example:
.. code-block:: bash
salt '*' aptly.get_repo name="test-repo"
'''
_validate_config(config_path)
with_packages = six.text_type(bool(with_packages)).lower()
ret = dict()
cmd = ['repo', 'show', '-config={}'.format(config_path),
'-with-packages={}'.format(with_packages), name]
cmd_ret = _cmd_run(cmd)
ret = _parse_show_output(cmd_ret=cmd_ret)
if ret:
log.debug('Found repository: %s', name)
else:
log.debug('Unable to find repository: %s', name)
return ret
def new_repo(name, config_path=_DEFAULT_CONFIG_PATH, comment=None, component=None,
distribution=None, uploaders_file=None, from_snapshot=None,
saltenv='base'):
'''
Create a new local package repository.
:param str name: The name of the local repository.
:param str config_path: The path to the configuration file for the aptly instance.
:param str comment: The description of the repository.
:param str component: The default component to use when publishing.
:param str distribution: The default distribution to use when publishing.
:param str uploaders_file: The repository upload restrictions config.
:param str from_snapshot: The snapshot to initialize the repository contents from.
:param str saltenv: The environment the file resides in.
:return: A boolean representing whether all changes succeeded.
:rtype: bool
CLI Example:
.. code-block:: bash
salt '*' aptly.new_repo name="test-repo" comment="Test main repo" component="main" distribution="trusty"
'''
_validate_config(config_path)
current_repo = __salt__['aptly.get_repo'](name=name, config_path=config_path)
if current_repo:
log.debug('Repository already exists: %s', name)
return True
cmd = ['repo', 'create', '-config={}'.format(config_path)]
repo_params = _format_repo_args(comment=comment, component=component,
distribution=distribution,
uploaders_file=uploaders_file, saltenv=saltenv)
cmd.extend(repo_params)
cmd.append(name)
if from_snapshot:
cmd.extend(['from', 'snapshot', from_snapshot])
_cmd_run(cmd)
repo = __salt__['aptly.get_repo'](name=name, config_path=config_path)
if repo:
log.debug('Created repo: %s', name)
return True
log.error('Unable to create repo: %s', name)
return False
def set_repo(name, config_path=_DEFAULT_CONFIG_PATH, comment=None, component=None,
distribution=None, uploaders_file=None, saltenv='base'):
'''
Configure the settings for a local package repository.
:param str name: The name of the local repository.
:param str config_path: The path to the configuration file for the aptly instance.
:param str comment: The description of the repository.
:param str component: The default component to use when publishing.
:param str distribution: The default distribution to use when publishing.
:param str uploaders_file: The repository upload restrictions config.
:param str from_snapshot: The snapshot to initialize the repository contents from.
:param str saltenv: The environment the file resides in.
:return: A boolean representing whether all changes succeeded.
:rtype: bool
CLI Example:
.. code-block:: bash
salt '*' aptly.set_repo name="test-repo" comment="Test universe repo" component="universe" distribution="xenial"
'''
_validate_config(config_path)
failed_settings = dict()
# Only check for settings that were passed in and skip the rest.
settings = {'comment': comment, 'component': component,
'distribution': distribution}
for setting in list(settings):
if settings[setting] is None:
settings.pop(setting, None)
current_settings = __salt__['aptly.get_repo'](name=name, config_path=config_path)
if not current_settings:
log.error('Unable to get repo: %s', name)
return False
# Discard any additional settings that get_repo gives
# us that are not present in the provided arguments.
for current_setting in list(current_settings):
if current_setting not in settings:
current_settings.pop(current_setting, None)
# Check the existing repo settings to see if they already have the desired values.
if settings == current_settings:
log.debug('Settings already have the desired values for repository: %s', name)
return True
cmd = ['repo', 'edit', '-config={}'.format(config_path)]
repo_params = _format_repo_args(comment=comment, component=component,
distribution=distribution,
uploaders_file=uploaders_file, saltenv=saltenv)
cmd.extend(repo_params)
cmd.append(name)
_cmd_run(cmd)
new_settings = __salt__['aptly.get_repo'](name=name, config_path=config_path)
# Check the new repo settings to see if they have the desired values.
for setting in settings:
if settings[setting] != new_settings[setting]:
failed_settings.update({setting: settings[setting]})
if failed_settings:
log.error('Unable to change settings for the repository: %s', name)
return False
log.debug('Settings successfully changed to the desired values for repository: %s', name)
return True
def delete_repo(name, config_path=_DEFAULT_CONFIG_PATH, force=False):
'''
Remove a local package repository.
:param str name: The name of the local repository.
:param str config_path: The path to the configuration file for the aptly instance.
:param bool force: Whether to remove the repository even if it is used as the source
of an existing snapshot.
:return: A boolean representing whether all changes succeeded.
:rtype: bool
CLI Example:
.. code-block:: bash
salt '*' aptly.delete_repo name="test-repo"
'''
_validate_config(config_path)
force = six.text_type(bool(force)).lower()
current_repo = __salt__['aptly.get_repo'](name=name, config_path=config_path)
if not current_repo:
log.debug('Repository already absent: %s', name)
return True
cmd = ['repo', 'drop', '-config={}'.format(config_path),
'-force={}'.format(force), name]
_cmd_run(cmd)
repo = __salt__['aptly.get_repo'](name=name, config_path=config_path)
if repo:
log.error('Unable to remove repo: %s', name)
return False
log.debug('Removed repo: %s', name)
return True
def list_mirrors(config_path=_DEFAULT_CONFIG_PATH):
'''
Get a list of all the mirrored remote repositories.
:param str config_path: The path to the configuration file for the aptly instance.
:return: A list of the mirror names.
:rtype: list
CLI Example:
.. code-block:: bash
salt '*' aptly.list_mirrors
'''
_validate_config(config_path)
cmd = ['mirror', 'list', '-config={}'.format(config_path), '-raw=true']
cmd_ret = _cmd_run(cmd)
ret = [line.strip() for line in cmd_ret.splitlines()]
log.debug('Found mirrors: %s', len(ret))
return ret
def get_mirror(name, config_path=_DEFAULT_CONFIG_PATH, with_packages=False):
'''
Get detailed information about a mirrored remote repository.
:param str name: The name of the remote repository mirror.
:param str config_path: The path to the configuration file for the aptly instance.
:param bool with_packages: Return a list of packages in the repo.
:return: A dictionary containing information about the mirror.
:rtype: dict
CLI Example:
.. code-block:: bash
salt '*' aptly.get_mirror name="test-mirror"
'''
_validate_config(config_path)
ret = dict()
cmd = ['mirror', 'show', '-config={}'.format(config_path),
'-with-packages={}'.format(str(with_packages).lower()),
name]
cmd_ret = _cmd_run(cmd)
ret = _parse_show_output(cmd_ret=cmd_ret)
if ret:
log.debug('Found mirror: %s', name)
else:
log.debug('Unable to find mirror: %s', name)
return ret
def delete_mirror(name, config_path=_DEFAULT_CONFIG_PATH, force=False):
'''
Remove a mirrored remote repository. By default, Package data is not removed.
:param str name: The name of the remote repository mirror.
:param str config_path: The path to the configuration file for the aptly instance.
:param bool force: Whether to remove the mirror even if it is used as the source
of an existing snapshot.
:return: A boolean representing whether all changes succeeded.
:rtype: bool
CLI Example:
.. code-block:: bash
salt '*' aptly.delete_mirror name="test-mirror"
'''
_validate_config(config_path)
force = six.text_type(bool(force)).lower()
current_mirror = __salt__['aptly.get_mirror'](name=name, config_path=config_path)
if not current_mirror:
log.debug('Mirror already absent: %s', name)
return True
cmd = ['mirror', 'drop', '-config={}'.format(config_path),
'-force={}'.format(force), name]
_cmd_run(cmd)
mirror = __salt__['aptly.get_mirror'](name=name, config_path=config_path)
if mirror:
log.error('Unable to remove mirror: %s', name)
return False
log.debug('Removed mirror: %s', name)
return True
def list_published(config_path=_DEFAULT_CONFIG_PATH):
'''
Get a list of all the published repositories.
:param str config_path: The path to the configuration file for the aptly instance.
:return: A list of the published repositories.
:rtype: list
CLI Example:
.. code-block:: bash
salt '*' aptly.list_published
'''
_validate_config(config_path)
ret = list()
cmd = ['publish', 'list', '-config={}'.format(config_path), '-raw=true']
cmd_ret = _cmd_run(cmd)
for line in cmd_ret.splitlines():
items = [item.strip() for item in line.split(' ', 1)]
ret.append({
'distribution': items[1],
'prefix': items[0]
})
log.debug('Found published repositories: %s', len(ret))
return ret
def get_published(name, config_path=_DEFAULT_CONFIG_PATH, endpoint='', prefix=None):
'''
Get the details of a published repository.
:param str name: The distribution name of the published repository.
:param str config_path: The path to the configuration file for the aptly instance.
:param str endpoint: The publishing endpoint.
:param str prefix: The prefix for publishing.
:return: A dictionary containing information about the published repository.
:rtype: dict
CLI Example:
.. code-block:: bash
salt '*' aptly.get_published name="test-dist"
'''
_validate_config(config_path)
ret = dict()
sources = list()
cmd = ['publish', 'show', '-config={}'.format(config_path), name]
if prefix:
cmd.append('{}:{}'.format(endpoint, prefix))
cmd_ret = _cmd_run(cmd)
ret = _parse_show_output(cmd_ret=cmd_ret)
if ret:
log.debug('Found published repository: %s', name)
else:
log.debug('Unable to find published repository: %s', name)
return ret
def delete_published(name, config_path=_DEFAULT_CONFIG_PATH, endpoint='', prefix=None,
skip_cleanup=False, force=False):
'''
Remove files belonging to a published repository. Aptly tries to remove as many files
belonging to this repository as possible.
:param str name: The distribution name of the published repository.
:param str config_path: The path to the configuration file for the aptly instance.
:param str endpoint: The publishing endpoint.
:param str prefix: The prefix for publishing.
:param bool skip_cleanup: Whether to remove unreferenced files.
:param bool force: Whether to remove the published repository even if component
cleanup fails.
:return: A boolean representing whether all changes succeeded.
:rtype: bool
CLI Example:
.. code-block:: bash
salt '*' aptly.delete_published name="test-dist"
'''
_validate_config(config_path)
force = six.text_type(bool(force)).lower()
skip_cleanup = six.text_type(bool(skip_cleanup)).lower()
current_published = __salt__['aptly.get_published'](name=name, config_path=config_path,
endpoint=endpoint, prefix=prefix)
if not current_published:
log.debug('Published repository already absent: %s', name)
return True
cmd = ['publish', 'drop', '-config={}'.format(config_path),
'-force-drop={}'.format(force), '-skip-cleanup={}'.format(skip_cleanup),
name]
if prefix:
cmd.append('{}:{}'.format(endpoint, prefix))
_cmd_run(cmd)
published = __salt__['aptly.get_published'](name=name, config_path=config_path,
endpoint=endpoint, prefix=prefix)
if published:
log.error('Unable to remove published snapshot: %s', name)
return False
log.debug('Removed published snapshot: %s', name)
return True
def list_snapshots(config_path=_DEFAULT_CONFIG_PATH, sort_by_time=False):
'''
Get a list of all the existing snapshots.
:param str config_path: The path to the configuration file for the aptly instance.
:param bool sort_by_time: Whether to sort by creation time instead of by name.
:return: A list of the snapshot names.
:rtype: list
CLI Example:
.. code-block:: bash
salt '*' aptly.list_snapshots
'''
_validate_config(config_path)
cmd = ['snapshot', 'list', '-config={}'.format(config_path), '-raw=true']
if sort_by_time:
cmd.append('-sort=time')
else:
cmd.append('-sort=name')
cmd_ret = _cmd_run(cmd)
ret = [line.strip() for line in cmd_ret.splitlines()]
log.debug('Found snapshots: %s', len(ret))
return ret
def get_snapshot(name, config_path=_DEFAULT_CONFIG_PATH, with_packages=False):
'''
Get detailed information about a snapshot.
:param str name: The name of the snapshot given during snapshot creation.
:param str config_path: The path to the configuration file for the aptly instance.
:param bool with_packages: Return a list of packages in the snapshot.
:return: A dictionary containing information about the snapshot.
:rtype: dict
CLI Example:
.. code-block:: bash
salt '*' aptly.get_snapshot name="test-repo"
'''
_validate_config(config_path)
sources = list()
cmd = ['snapshot', 'show', '-config={}'.format(config_path),
'-with-packages={}'.format(str(with_packages).lower()),
name]
cmd_ret = _cmd_run(cmd)
ret = _parse_show_output(cmd_ret=cmd_ret)
if ret:
log.debug('Found shapshot: %s', name)
else:
log.debug('Unable to find snapshot: %s', name)
return ret
def delete_snapshot(name, config_path=_DEFAULT_CONFIG_PATH, force=False):
'''
Remove information about a snapshot. If a snapshot is published, it can not be
dropped without first removing publishing for that snapshot. If a snapshot is
used as the source for other snapshots, Aptly will refuse to remove it unless
forced.
:param str name: The name of the snapshot given during snapshot creation.
:param str config_path: The path to the configuration file for the aptly instance.
:param bool force: Whether to remove the snapshot even if it is used as the source
of another snapshot.
:return: A boolean representing whether all changes succeeded.
:rtype: bool
CLI Example:
.. code-block:: bash
salt '*' aptly.delete_snapshot name="test-snapshot"
'''
_validate_config(config_path)
force = six.text_type(bool(force)).lower()
current_snapshot = __salt__['aptly.get_snapshot'](name=name, config_path=config_path)
if not current_snapshot:
log.debug('Snapshot already absent: %s', name)
return True
cmd = ['snapshot', 'drop', '-config={}'.format(config_path),
'-force={}'.format(force), name]
_cmd_run(cmd)
snapshot = __salt__['aptly.get_snapshot'](name=name, config_path=config_path)
if snapshot:
log.error('Unable to remove snapshot: %s', name)
return False
log.debug('Removed snapshot: %s', name)
return True
def cleanup_db(config_path=_DEFAULT_CONFIG_PATH, dry_run=False):
'''
Remove data regarding unreferenced packages and delete files in the package pool that
are no longer being used by packages.
:param bool dry_run: Report potential changes without making any changes.
:return: A dictionary of the package keys and files that were removed.
:rtype: dict
CLI Example:
.. code-block:: bash
salt '*' aptly.cleanup_db
'''
_validate_config(config_path)
dry_run = six.text_type(bool(dry_run)).lower()
ret = {'deleted_keys': list(),
'deleted_files': list()}
cmd = ['db', 'cleanup', '-config={}'.format(config_path),
'-dry-run={}'.format(dry_run), '-verbose=true']
cmd_ret = _cmd_run(cmd)
type_pattern = r'^List\s+[\w\s]+(?P<package_type>(file|key)s)[\w\s]+:$'
list_pattern = r'^\s+-\s+(?P<package>.*)$'
current_block = None
for line in cmd_ret.splitlines():
if current_block:
match = re.search(list_pattern, line)
if match:
package_type = 'deleted_{}'.format(current_block)
ret[package_type].append(match.group('package'))
else:
current_block = None
# Intentionally not using an else here, in case of a situation where
# the next list header might be bordered by the previous list.
if not current_block:
match = re.search(type_pattern, line)
if match:
current_block = match.group('package_type')
log.debug('Package keys identified for deletion: %s', len(ret['deleted_keys']))
log.debug('Package files identified for deletion: %s', len(ret['deleted_files']))
return ret