# -*- coding: utf-8 -*-

An engine that listens for libvirt events and resends them to the salt event bus.

The minimal configuration is the following and will listen to all events on the
local hypervisor and send them with a tag starting with ``salt/engines/libvirt_events``:

.. code-block:: yaml

        - libvirt_events

Note that the automatically-picked libvirt connection will depend on the value
of ``uri_default`` in ``/etc/libvirt/libvirt.conf``. To force using another
connection like the local LXC libvirt driver, set the ``uri`` property as in the
following example configuration.

.. code-block:: yaml

        - libvirt_events:
            uri: lxc:///
            tag_prefix: libvirt
                - domain/lifecycle
                - domain/reboot
                - pool

Filters is a list of event types to relay to the event bus. Items in this list
can be either one of the main types (``domain``, ``network``, ``pool``,
``nodedev``, ``secret``), ``all`` or a more precise filter. These can be done
with values like <main_type>/<subtype>. The possible values are in the
CALLBACK_DEFS constant. If the filters list contains ``all``, all
events will be relayed.

Be aware that the list of events increases with libvirt versions, for example
network events have been added in libvirt 1.2.1.

Running the engine on non-root

Running this engine as non-root requires a special attention, which is surely
the case for the master running as user `salt`. The engine is likely to fail
to connect to libvirt with an error like this one:

    [ERROR   ] authentication unavailable: no polkit agent available to authenticate action 'org.libvirt.unix.monitor'

To fix this, the user running the engine, for example the salt-master, needs
to have the rights to connect to libvirt in the machine polkit config.
A polkit rule like the following one will allow `salt` user to connect to libvirt:

.. code-block:: javascript

    polkit.addRule(function(action, subject) {
        if ("org.libvirt") == 0 &&
            subject.user == "salt") {
            return polkit.Result.YES;

:depends: libvirt 1.0.0+ python binding

.. versionadded:: 2019.2.0

from __future__ import absolute_import, unicode_literals, print_function
import logging

# Import salt libs
import salt.utils.event

# pylint: disable=no-name-in-module,import-error
from salt.ext.six.moves.urllib.parse import urlparse
# pylint: enable=no-name-in-module,import-error

log = logging.getLogger(__name__)

    import libvirt
except ImportError:
    libvirt = None  # pylint: disable=invalid-name

def __virtual__():
    Only load if libvirt python binding is present
    if libvirt is None:
        msg = 'libvirt module not found'
    elif libvirt.getVersion() < 1000000:
        msg = 'libvirt >= 1.0.0 required'
        msg = ''
    return not bool(msg), msg

    'domain': 'domainEventRegisterAny',
    'network': 'networkEventRegisterAny',
    'pool': 'storagePoolEventRegisterAny',
    'nodedev': 'nodeDeviceEventRegisterAny',
    'secret': 'secretEventRegisterAny'

# Handle either BLOCK_JOB or BLOCK_JOB_2, but prefer the latter
if hasattr(libvirt, 'VIR_DOMAIN_EVENT_ID_BLOCK_JOB_2'):

    'domain': (('lifecycle', None),
               ('reboot', None),
               ('rtc_change', None),
               ('watchdog', None),
               ('graphics', None),
               ('io_error', 'VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON'),
               ('control_error', None),
               ('disk_change', None),
               ('tray_change', None),
               ('pmwakeup', None),
               ('pmsuspend', None),
               ('balloon_change', None),
               ('pmsuspend_disk', None),
               ('device_removed', None),
               ('block_job', BLOCK_JOB_ID),
               ('tunable', None),
               ('agent_lifecycle', None),
               ('device_added', None),
               ('migration_iteration', None),
               ('job_completed', None),
               ('device_removal_failed', None),
               ('metadata_change', None),
               ('block_threshold', None)),
    'network': (('lifecycle', None),),
    'pool': (('lifecycle', None),
             ('refresh', None)),
    'nodedev': (('lifecycle', None),
                ('update', None)),
    'secret': (('lifecycle', None),
               ('value_changed', None))

def _compute_subprefix(attr):
    Get the part before the first '_' or the end of attr including
    the potential '_'
    return ''.join((attr.split('_')[0], '_' if len(attr.split('_')) > 1 else ''))

def _get_libvirt_enum_string(prefix, value):
    Convert the libvirt enum integer value into a human readable string.

    :param prefix: start of the libvirt attribute to look for.
    :param value: integer to convert to string
    attributes = [attr[len(prefix):] for attr in libvirt.__dict__ if attr.startswith(prefix)]

    # Filter out the values starting with a common base as they match another enum
    prefixes = [_compute_subprefix(p) for p in attributes]
    counts = {p: prefixes.count(p) for p in prefixes}
    sub_prefixes = [p for p, count in counts.items() if count > 1 or (p.endswith('_') and p[:-1] in prefixes)]
    filtered = [attr for attr in attributes if _compute_subprefix(attr) not in sub_prefixes]

    for candidate in filtered:
        if value == getattr(libvirt, ''.join((prefix, candidate))):
            name = candidate.lower().replace('_', ' ')
            return name
    return 'unknown'

def _get_domain_event_detail(event, detail):
    Convert event and detail numeric values into a tuple of human readable strings
    event_name = _get_libvirt_enum_string('VIR_DOMAIN_EVENT_', event)
    if event_name == 'unknown':
        return event_name, 'unknown'

    prefix = 'VIR_DOMAIN_EVENT_{0}_'.format(event_name.upper())
    detail_name = _get_libvirt_enum_string(prefix, detail)

    return event_name, detail_name

def _salt_send_event(opaque, conn, data):
    Convenience function adding common data to the event and sending it
    on the salt event bus.

    :param opaque: the opaque data that is passed to the callback.
                   This is a dict with 'prefix', 'object' and 'event' keys.
    :param conn: libvirt connection
    :param data: additional event data dict to send
    tag_prefix = opaque['prefix']
    object_type = opaque['object']
    event_type = opaque['event']

    # Prepare the connection URI to fit in the tag
    # qemu+ssh://user@host:1234/system -> qemu+ssh/user@host:1234/system
    uri = urlparse(conn.getURI())
    uri_tag = [uri.scheme]
    if uri.netloc:
    path = uri.path.strip('/')
    if path:
    uri_str = "/".join(uri_tag)

    # Append some common data
    all_data = {
        'uri': conn.getURI()

    tag = '/'.join((tag_prefix, uri_str, object_type, event_type))

    # Actually send the event in salt
    if __opts__.get('__role') == 'master':
            __opts__['sock_dir']).fire_event(all_data, tag)
        __salt__['event.send'](tag, all_data)

def _salt_send_domain_event(opaque, conn, domain, event, event_data):
    Helper function send a salt event for a libvirt domain.

    :param opaque: the opaque data that is passed to the callback.
                   This is a dict with 'prefix', 'object' and 'event' keys.
    :param conn: libvirt connection
    :param domain: name of the domain related to the event
    :param event: name of the event
    :param event_data: additional event data dict to send
    data = {
        'domain': {
            'id': domain.ID(),
            'uuid': domain.UUIDString()
        'event': event
    _salt_send_event(opaque, conn, data)

def _domain_event_lifecycle_cb(conn, domain, event, detail, opaque):
    Domain lifecycle events handler
    event_str, detail_str = _get_domain_event_detail(event, detail)

    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'event':  event_str,
        'detail': detail_str

def _domain_event_reboot_cb(conn, domain, opaque):
    Domain reboot events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {})

def _domain_event_rtc_change_cb(conn, domain, utcoffset, opaque):
    Domain RTC change events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'utcoffset': utcoffset

def _domain_event_watchdog_cb(conn, domain, action, opaque):
    Domain watchdog events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'action': _get_libvirt_enum_string('VIR_DOMAIN_EVENT_WATCHDOG_', action)

def _domain_event_io_error_cb(conn, domain, srcpath, devalias, action, reason, opaque):
    Domain I/O Error events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'srcPath': srcpath,
        'dev': devalias,
        'action': _get_libvirt_enum_string('VIR_DOMAIN_EVENT_IO_ERROR_', action),
        'reason': reason

def _domain_event_graphics_cb(conn, domain, phase, local, remote, auth, subject, opaque):
    Domain graphics events handler

    def get_address(addr):
        transform address structure into event data piece
        return {'family': _get_libvirt_enum_string('{0}_ADDRESS_'.format(prefix), addr['family']),
                'node': addr['node'],
                'service': addr['service']}

    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'phase': _get_libvirt_enum_string(prefix, phase),
        'local': get_address(local),
        'remote': get_address(remote),
        'authScheme': auth,
        'subject': [{'type': item[0], 'name': item[1]} for item in subject]

def _domain_event_control_error_cb(conn, domain, opaque):
    Domain control error events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {})

def _domain_event_disk_change_cb(conn, domain, old_src, new_src, dev, reason, opaque):
    Domain disk change events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'oldSrcPath': old_src,
        'newSrcPath': new_src,
        'dev': dev,
        'reason': _get_libvirt_enum_string('VIR_DOMAIN_EVENT_DISK_', reason)

def _domain_event_tray_change_cb(conn, domain, dev, reason, opaque):
    Domain tray change events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'dev': dev,
        'reason': _get_libvirt_enum_string('VIR_DOMAIN_EVENT_TRAY_CHANGE_', reason)

def _domain_event_pmwakeup_cb(conn, domain, reason, opaque):
    Domain wakeup events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'reason': 'unknown'  # currently unused

def _domain_event_pmsuspend_cb(conn, domain, reason, opaque):
    Domain suspend events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'reason': 'unknown'  # currently unused

def _domain_event_balloon_change_cb(conn, domain, actual, opaque):
    Domain balloon change events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'actual': actual

def _domain_event_pmsuspend_disk_cb(conn, domain, reason, opaque):
    Domain disk suspend events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'reason': 'unknown'  # currently unused

def _domain_event_block_job_cb(conn, domain, disk, job_type, status, opaque):
    Domain block job events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'disk': disk,
        'type': _get_libvirt_enum_string('VIR_DOMAIN_BLOCK_JOB_TYPE_', job_type),
        'status': _get_libvirt_enum_string('VIR_DOMAIN_BLOCK_JOB_', status)

def _domain_event_device_removed_cb(conn, domain, dev, opaque):
    Domain device removal events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'dev': dev

def _domain_event_tunable_cb(conn, domain, params, opaque):
    Domain tunable events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'params': params

# pylint: disable=invalid-name
def _domain_event_agent_lifecycle_cb(conn, domain, state, reason, opaque):
    Domain agent lifecycle events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'state': _get_libvirt_enum_string('VIR_CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_STATE_', state),
        'reason': _get_libvirt_enum_string('VIR_CONNECT_DOMAIN_EVENT_AGENT_LIFECYCLE_REASON_', reason)

def _domain_event_device_added_cb(conn, domain, dev, opaque):
    Domain device addition events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'dev': dev

# pylint: disable=invalid-name
def _domain_event_migration_iteration_cb(conn, domain, iteration, opaque):
    Domain migration iteration events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'iteration': iteration

def _domain_event_job_completed_cb(conn, domain, params, opaque):
    Domain job completion events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'params': params

def _domain_event_device_removal_failed_cb(conn, domain, dev, opaque):
    Domain device removal failure events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'dev': dev

def _domain_event_metadata_change_cb(conn, domain, mtype, nsuri, opaque):
    Domain metadata change events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'type': _get_libvirt_enum_string('VIR_DOMAIN_METADATA_', mtype),
        'nsuri': nsuri

def _domain_event_block_threshold_cb(conn, domain, dev, path, threshold, excess, opaque):
    Domain block threshold events handler
    _salt_send_domain_event(opaque, conn, domain, opaque['event'], {
        'dev': dev,
        'path': path,
        'threshold': threshold,
        'excess': excess

def _network_event_lifecycle_cb(conn, net, event, detail, opaque):
    Network lifecycle events handler

    _salt_send_event(opaque, conn, {
        'network': {
            'uuid': net.UUIDString()
        'event': _get_libvirt_enum_string('VIR_NETWORK_EVENT_', event),
        'detail': 'unknown'  # currently unused

def _pool_event_lifecycle_cb(conn, pool, event, detail, opaque):
    Storage pool lifecycle events handler
    _salt_send_event(opaque, conn, {
        'pool': {
            'uuid': pool.UUIDString()
        'event': _get_libvirt_enum_string('VIR_STORAGE_POOL_EVENT_', event),
        'detail': 'unknown'  # currently unused

def _pool_event_refresh_cb(conn, pool, opaque):
    Storage pool refresh events handler
    _salt_send_event(opaque, conn, {
        'pool': {
            'uuid': pool.UUIDString()
        'event': opaque['event']

def _nodedev_event_lifecycle_cb(conn, dev, event, detail, opaque):
    Node device lifecycle events handler
    _salt_send_event(opaque, conn, {
        'nodedev': {
        'event': _get_libvirt_enum_string('VIR_NODE_DEVICE_EVENT_', event),
        'detail': 'unknown'  # currently unused

def _nodedev_event_update_cb(conn, dev, opaque):
    Node device update events handler
    _salt_send_event(opaque, conn, {
        'nodedev': {
        'event': opaque['event']

def _secret_event_lifecycle_cb(conn, secret, event, detail, opaque):
    Secret lifecycle events handler
    _salt_send_event(opaque, conn, {
        'secret': {
            'uuid': secret.UUIDString()
        'event': _get_libvirt_enum_string('VIR_SECRET_EVENT_', event),
        'detail': 'unknown'  # currently unused

def _secret_event_value_changed_cb(conn, secret, opaque):
    Secret value change events handler
    _salt_send_event(opaque, conn, {
        'secret': {
            'uuid': secret.UUIDString()
        'event': opaque['event']

def _cleanup(cnx):
    Close the libvirt connection

    :param cnx: libvirt connection
    log.debug('Closing libvirt connection: %s', cnx.getURI())

def _callbacks_cleanup(cnx, callback_ids):
    Unregister all the registered callbacks

    :param cnx: libvirt connection
    :param callback_ids: dictionary mapping a libvirt object type to an ID list
                         of callbacks to deregister
    for obj, ids in callback_ids.items():
        register_name = REGISTER_FUNCTIONS[obj]
        deregister_name = register_name.replace('Reg', 'Dereg')
        deregister = getattr(cnx, deregister_name)
        for callback_id in ids:

def _register_callback(cnx, tag_prefix, obj, event, real_id):
    Helper function registering a callback

    :param cnx: libvirt connection
    :param tag_prefix: salt event tag prefix to use
    :param obj: the libvirt object name for the event. Needs to
                be one of the REGISTER_FUNCTIONS keys.
    :param event: the event type name.
    :param real_id: the libvirt name of an alternative event id to use or None

    :rtype integer value needed to deregister the callback
    libvirt_name = real_id
    if real_id is None:
        libvirt_name = 'VIR_{0}_EVENT_ID_{1}'.format(obj, event).upper()

    if not hasattr(libvirt, libvirt_name):
        log.warning('Skipping "%s/%s" events: libvirt too old', obj, event)
        return None

    libvirt_id = getattr(libvirt, libvirt_name)
    callback_name = "_{0}_event_{1}_cb".format(obj, event)
    callback = globals().get(callback_name, None)
    if callback is None:
        log.error('Missing function %s in engine', callback_name)
        return None

    register = getattr(cnx, REGISTER_FUNCTIONS[obj])
    return register(None, libvirt_id, callback,
                    {'prefix': tag_prefix,
                     'object': obj,
                     'event': event})

def _append_callback_id(ids, obj, callback_id):
    Helper function adding a callback ID to the IDs dict.
    The callback ids dict maps an object to event callback ids.

    :param ids: dict of callback IDs to update
    :param obj: one of the keys of REGISTER_FUNCTIONS
    :param callback_id: the result of _register_callback
    if obj not in ids:
        ids[obj] = []

def start(uri=None,
    Listen to libvirt events and forward them to salt.

    :param uri: libvirt URI to listen on.
                Defaults to None to pick the first available local hypervisor
    :param tag_prefix: the begining of the salt event tag to use.
                       Defaults to 'salt/engines/libvirt_events'
    :param filters: the list of event of listen on. Defaults to 'all'
    if filters is None:
        filters = ['all']

        cnx = libvirt.openReadOnly(uri)
        log.debug('Opened libvirt uri: %s', cnx.getURI())

        callback_ids = {}
        all_filters = "all" in filters

        for obj, event_defs in CALLBACK_DEFS.items():
            for event, real_id in event_defs:
                event_filter = "/".join((obj, event))
                if event_filter not in filters and obj not in filters and not all_filters:
                registered_id = _register_callback(cnx, tag_prefix,
                                                   obj, event, real_id)
                if registered_id:
                    _append_callback_id(callback_ids, obj, registered_id)

        exit_loop = False
        while not exit_loop:
            exit_loop = libvirt.virEventRunDefaultImpl() < 0
            log.debug('=== in the loop exit_loop %s ===', exit_loop)

    except Exception as err:  # pylint: disable=broad-except
        _callbacks_cleanup(cnx, callback_ids)