saltstack/salt

View on GitHub
salt/modules/elasticsearch.py

Summary

Maintainability
F
1 wk
Test Coverage
# -*- coding: utf-8 -*-
'''
Elasticsearch - A distributed RESTful search and analytics server

Module to provide Elasticsearch compatibility to Salt
(compatible with Elasticsearch version 1.5.2+)

.. versionadded:: 2015.8.0

:depends:       `elasticsearch-py <http://elasticsearch-py.readthedocs.org/en/latest/>`_

:configuration: This module accepts connection configuration details either as
    parameters or as configuration settings in /etc/salt/minion on the relevant
    minions:

    .. code-block:: yaml

        elasticsearch:
          host: '10.10.10.100:9200'

        elasticsearch-cluster:
          hosts:
            - '10.10.10.100:9200'
            - '10.10.10.101:9200'
            - '10.10.10.102:9200'

        elasticsearch-extra:
          hosts:
            - '10.10.10.100:9200'
          use_ssl: True
          verify_certs: True
          ca_certs: /path/to/custom_ca_bundle.pem
          number_of_shards: 1
          number_of_replicas: 0
          functions_blacklist:
            - 'saltutil.find_job'
            - 'pillar.items'
            - 'grains.items'
          proxies:
            - http: http://proxy:3128
            - https: http://proxy:1080

    When specifying proxies the requests backend will be used and the 'proxies'
    data structure is passed as-is to that module.

    This data can also be passed into pillar. Options passed into opts will
    overwrite options passed into pillar.

    Some functionality might be limited by elasticsearch-py and Elasticsearch server versions.
'''

# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import sys

# Import Salt Libs
from salt.exceptions import CommandExecutionError, SaltInvocationError
from salt.ext import six

log = logging.getLogger(__name__)

# Import third party libs
try:
    import elasticsearch
    from elasticsearch import RequestsHttpConnection
    logging.getLogger('elasticsearch').setLevel(logging.CRITICAL)
    HAS_ELASTICSEARCH = True
except ImportError:
    HAS_ELASTICSEARCH = False


def __virtual__():
    '''
    Only load if elasticsearch libraries exist.
    '''
    if not HAS_ELASTICSEARCH:
        return (False, 'Cannot load module elasticsearch: elasticsearch libraries not found')
    return True


def _get_instance(hosts=None, profile=None):
    '''
    Return the elasticsearch instance
    '''
    es = None
    proxies = None
    use_ssl = False
    ca_certs = None
    verify_certs = True
    http_auth = None
    timeout = 10

    if profile is None:
        profile = 'elasticsearch'

    if isinstance(profile, six.string_types):
        _profile = __salt__['config.option'](profile, None)
    elif isinstance(profile, dict):
        _profile = profile
    if _profile:
        hosts = _profile.get('host', hosts)
        if not hosts:
            hosts = _profile.get('hosts', hosts)
        proxies = _profile.get('proxies', None)
        use_ssl = _profile.get('use_ssl', False)
        ca_certs = _profile.get('ca_certs', None)
        verify_certs = _profile.get('verify_certs', True)
        username = _profile.get('username', None)
        password = _profile.get('password', None)
        timeout = _profile.get('timeout', 10)

        if username and password:
            http_auth = (username, password)

    if not hosts:
        hosts = ['127.0.0.1:9200']
    if isinstance(hosts, six.string_types):
        hosts = [hosts]
    try:
        if proxies:
            # Custom connection class to use requests module with proxies
            class ProxyConnection(RequestsHttpConnection):
                def __init__(self, *args, **kwargs):
                    proxies = kwargs.pop('proxies', {})
                    super(ProxyConnection, self).__init__(*args, **kwargs)
                    self.session.proxies = proxies

            es = elasticsearch.Elasticsearch(
                hosts,
                connection_class=ProxyConnection,
                proxies=proxies,
                use_ssl=use_ssl,
                ca_certs=ca_certs,
                verify_certs=verify_certs,
                http_auth=http_auth,
                timeout=timeout,
            )
        else:
            es = elasticsearch.Elasticsearch(
                    hosts,
                    use_ssl=use_ssl,
                    ca_certs=ca_certs,
                    verify_certs=verify_certs,
                    http_auth=http_auth,
                    timeout=timeout,
                )

        # Try the connection
        es.info()
    except elasticsearch.exceptions.TransportError as err:
        raise CommandExecutionError(
            'Could not connect to Elasticsearch host/ cluster {0} due to {1}'.format(hosts, err))
    return es


def ping(allow_failure=False, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Test connection to Elasticsearch instance. This method does not fail if not explicitly specified.

    allow_failure
        Throw exception if ping fails

    CLI example::

        salt myminion elasticsearch.ping allow_failure=True
        salt myminion elasticsearch.ping profile=elasticsearch-extra
    '''
    try:
        _get_instance(hosts, profile)
    except CommandExecutionError as e:
        if allow_failure:
            six.reraise(*sys.exc_info())
        return False
    return True


def info(hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Return Elasticsearch information.

    CLI example::

        salt myminion elasticsearch.info
        salt myminion elasticsearch.info profile=elasticsearch-extra
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.info()
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve server information, server returned code {0} with message {1}".format(e.status_code, e.error))


def node_info(nodes=None, flat_settings=False, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Return Elasticsearch node information.

    nodes
        List of cluster nodes (id or name) to display stats for. Use _local for connected node, empty for all
    flat_settings
        Flatten settings keys

    CLI example::

        salt myminion elasticsearch.node_info flat_settings=True
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.nodes.info(node_id=nodes, flat_settings=flat_settings)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve node information, server returned code {0} with message {1}".format(e.status_code, e.error))


def cluster_health(index=None, level='cluster', local=False, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Return Elasticsearch cluster health.

    index
        Limit the information returned to a specific index
    level
        Specify the level of detail for returned information, default 'cluster', valid choices are: 'cluster', 'indices', 'shards'
    local
        Return local information, do not retrieve the state from master node

    CLI example::

        salt myminion elasticsearch.cluster_health
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.cluster.health(index=index, level=level, local=local)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve health information, server returned code {0} with message {1}".format(e.status_code, e.error))


def cluster_stats(nodes=None, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Return Elasticsearch cluster stats.

    nodes
        List of cluster nodes (id or name) to display stats for. Use _local for connected node, empty for all

    CLI example::

        salt myminion elasticsearch.cluster_stats
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.cluster.stats(node_id=nodes)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve cluster stats, server returned code {0} with message {1}".format(e.status_code, e.error))


def alias_create(indices, alias, hosts=None, body=None, profile=None, source=None):
    '''
    Create an alias for a specific index/indices

    indices
        Single or multiple indices separated by comma, use _all to perform the operation on all indices.
    alias
        Alias name
    body
        Optional definition such as routing or filter as defined in https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html
    source
        URL of file specifying optional definition such as routing or filter. Cannot be used in combination with ``body``.

    CLI example::

        salt myminion elasticsearch.alias_create testindex_v1 testindex
    '''
    es = _get_instance(hosts, profile)
    if source and body:
        message = 'Either body or source should be specified but not both.'
        raise SaltInvocationError(message)
    if source:
        body = __salt__['cp.get_file_str'](
                  source,
                  saltenv=__opts__.get('saltenv', 'base'))
    try:
        result = es.indices.put_alias(index=indices, name=alias, body=body)
        return result.get('acknowledged', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot create alias {0} in index {1}, server returned code {2} with message {3}".format(alias, indices, e.status_code, e.error))


def alias_delete(indices, aliases, hosts=None, body=None, profile=None, source=None):
    '''
    Delete an alias of an index

    indices
        Single or multiple indices separated by comma, use _all to perform the operation on all indices.
    aliases
        Alias names separated by comma

    CLI example::

        salt myminion elasticsearch.alias_delete testindex_v1 testindex
    '''
    es = _get_instance(hosts, profile)
    if source and body:
        message = 'Either body or source should be specified but not both.'
        raise SaltInvocationError(message)
    if source:
        body = __salt__['cp.get_file_str'](
                  source,
                  saltenv=__opts__.get('saltenv', 'base'))
    try:
        result = es.indices.delete_alias(index=indices, name=aliases)

        return result.get('acknowledged', False)
    except elasticsearch.exceptions.NotFoundError:
        return True
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot delete alias {0} in index {1}, server returned code {2} with message {3}".format(aliases, indices, e.status_code, e.error))


def alias_exists(aliases, indices=None, hosts=None, profile=None):
    '''
    Return a boolean indicating whether given alias exists

    indices
        Single or multiple indices separated by comma, use _all to perform the operation on all indices.
    aliases
        Alias names separated by comma

    CLI example::

        salt myminion elasticsearch.alias_exists None testindex
    '''
    es = _get_instance(hosts, profile)
    try:
        return es.indices.exists_alias(name=aliases, index=indices)
    except elasticsearch.exceptions.NotFoundError:
        return False
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot get alias {0} in index {1}, server returned code {2} with message {3}".format(aliases, indices, e.status_code, e.error))


def alias_get(indices=None, aliases=None, hosts=None, profile=None):
    '''
    Check for the existence of an alias and if it exists, return it

    indices
        Single or multiple indices separated by comma, use _all to perform the operation on all indices.
    aliases
        Alias names separated by comma

    CLI example::

        salt myminion elasticsearch.alias_get testindex
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.indices.get_alias(index=indices, name=aliases)
    except elasticsearch.exceptions.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot get alias {0} in index {1}, server returned code {2} with message {3}".format(aliases, indices, e.status_code, e.error))


def document_create(index, doc_type, body=None, id=None, hosts=None, profile=None, source=None):
    '''
    Create a document in a specified index

    index
        Index name where the document should reside
    doc_type
        Type of the document
    body
        Document to store
    source
        URL of file specifying document to store. Cannot be used in combination with ``body``.
    id
        Optional unique document identifier for specified doc_type (empty for random)

    CLI example::

        salt myminion elasticsearch.document_create testindex doctype1 '{}'
    '''
    es = _get_instance(hosts, profile)
    if source and body:
        message = 'Either body or source should be specified but not both.'
        raise SaltInvocationError(message)
    if source:
        body = __salt__['cp.get_file_str'](
                  source,
                  saltenv=__opts__.get('saltenv', 'base'))
    try:
        return es.index(index=index, doc_type=doc_type, body=body, id=id)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot create document in index {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))


def document_delete(index, doc_type, id, hosts=None, profile=None):
    '''
    Delete a document from an index

    index
        Index name where the document resides
    doc_type
        Type of the document
    id
        Document identifier

    CLI example::

        salt myminion elasticsearch.document_delete testindex doctype1 AUx-384m0Bug_8U80wQZ
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.delete(index=index, doc_type=doc_type, id=id)
    except elasticsearch.exceptions.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot delete document {0} in index {1}, server returned code {2} with message {3}".format(id, index, e.status_code, e.error))


def document_exists(index, id, doc_type='_all', hosts=None, profile=None):
    '''
    Return a boolean indicating whether given document exists

    index
        Index name where the document resides
    id
        Document identifier
    doc_type
        Type of the document, use _all to fetch the first document matching the ID across all types

    CLI example::

        salt myminion elasticsearch.document_exists testindex AUx-384m0Bug_8U80wQZ
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.exists(index=index, id=id, doc_type=doc_type)
    except elasticsearch.exceptions.NotFoundError:
        return False
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve document {0} from index {1}, server returned code {2} with message {3}".format(id, index, e.status_code, e.error))


def document_get(index, id, doc_type='_all', hosts=None, profile=None):
    '''
    Check for the existence of a document and if it exists, return it

    index
        Index name where the document resides
    id
        Document identifier
    doc_type
        Type of the document, use _all to fetch the first document matching the ID across all types

    CLI example::

        salt myminion elasticsearch.document_get testindex AUx-384m0Bug_8U80wQZ
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.get(index=index, id=id, doc_type=doc_type)
    except elasticsearch.exceptions.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve document {0} from index {1}, server returned code {2} with message {3}".format(id, index, e.status_code, e.error))


def index_create(index, body=None, hosts=None, profile=None, source=None):
    '''
    Create an index

    index
        Index name
    body
        Index definition, such as settings and mappings as defined in https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
    source
        URL to file specifying index definition. Cannot be used in combination with ``body``.

    CLI example::

        salt myminion elasticsearch.index_create testindex
        salt myminion elasticsearch.index_create testindex2 '{"settings" : {"index" : {"number_of_shards" : 3, "number_of_replicas" : 2}}}'
    '''
    es = _get_instance(hosts, profile)
    if source and body:
        message = 'Either body or source should be specified but not both.'
        raise SaltInvocationError(message)
    if source:
        body = __salt__['cp.get_file_str'](
                  source,
                  saltenv=__opts__.get('saltenv', 'base'))
    try:
        result = es.indices.create(index=index, body=body)
        return result.get('acknowledged', False) and result.get("shards_acknowledged", True)
    except elasticsearch.TransportError as e:
        if "index_already_exists_exception" == e.error:
            return True

        raise CommandExecutionError("Cannot create index {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))


def index_delete(index, hosts=None, profile=None):
    '''
    Delete an index

    index
        Index name

    CLI example::

        salt myminion elasticsearch.index_delete testindex
    '''
    es = _get_instance(hosts, profile)

    try:
        result = es.indices.delete(index=index)

        return result.get('acknowledged', False)
    except elasticsearch.exceptions.NotFoundError:
        return True
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot delete index {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))


def index_exists(index, hosts=None, profile=None):
    '''
    Return a boolean indicating whether given index exists

    index
        Index name

    CLI example::

        salt myminion elasticsearch.index_exists testindex
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.indices.exists(index=index)
    except elasticsearch.exceptions.NotFoundError:
        return False
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve index {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))


def index_get(index, hosts=None, profile=None):
    '''
    Check for the existence of an index and if it exists, return it

    index
        Index name

    CLI example::

        salt myminion elasticsearch.index_get testindex
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.indices.get(index=index)
    except elasticsearch.exceptions.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve index {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))


def index_open(index, allow_no_indices=True, expand_wildcards='closed', ignore_unavailable=True, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Open specified index.

    index
        Index to be opened
    allow_no_indices
        Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes _all string or when no indices have been specified)
    expand_wildcards
        Whether to expand wildcard expression to concrete indices that are open, closed or both., default ‘closed’, valid choices are: ‘open’, ‘closed’, ‘none’, ‘all’
    ignore_unavailable
        Whether specified concrete indices should be ignored when unavailable (missing or closed)

    CLI example::

        salt myminion elasticsearch.index_open testindex
    '''
    es = _get_instance(hosts, profile)

    try:
        result = es.indices.open(index=index, allow_no_indices=allow_no_indices, expand_wildcards=expand_wildcards, ignore_unavailable=ignore_unavailable)

        return result.get('acknowledged', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot open index {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))


def index_close(index, allow_no_indices=True, expand_wildcards='open', ignore_unavailable=True, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Close specified index.

    index
        Index to be closed
    allow_no_indices
        Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes _all string or when no indices have been specified)
    expand_wildcards
        Whether to expand wildcard expression to concrete indices that are open, closed or both., default ‘open’, valid choices are: ‘open’, ‘closed’, ‘none’, ‘all’
    ignore_unavailable
        Whether specified concrete indices should be ignored when unavailable (missing or closed)

    CLI example::

        salt myminion elasticsearch.index_close testindex
    '''
    es = _get_instance(hosts, profile)

    try:
        result = es.indices.close(index=index, allow_no_indices=allow_no_indices, expand_wildcards=expand_wildcards, ignore_unavailable=ignore_unavailable)

        return result.get('acknowledged', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot close index {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))


def mapping_create(index, doc_type, body=None, hosts=None, profile=None, source=None):
    '''
    Create a mapping in a given index

    index
        Index for the mapping
    doc_type
        Name of the document type
    body
        Mapping definition as specified in https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
    source
        URL to file specifying mapping definition. Cannot be used in combination with ``body``.

    CLI example::

        salt myminion elasticsearch.mapping_create testindex user '{ "user" : { "properties" : { "message" : {"type" : "string", "store" : true } } } }'
    '''
    es = _get_instance(hosts, profile)
    if source and body:
        message = 'Either body or source should be specified but not both.'
        raise SaltInvocationError(message)
    if source:
        body = __salt__['cp.get_file_str'](
                  source,
                  saltenv=__opts__.get('saltenv', 'base'))
    try:
        result = es.indices.put_mapping(index=index, doc_type=doc_type, body=body)

        return result.get('acknowledged', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot create mapping {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))


def mapping_delete(index, doc_type, hosts=None, profile=None):
    '''
    Delete a mapping (type) along with its data. As of Elasticsearch 5.0 this is no longer available.

    index
        Index for the mapping
    doc_type
        Name of the document type

    CLI example::

        salt myminion elasticsearch.mapping_delete testindex user
    '''
    es = _get_instance(hosts, profile)
    try:
        result = es.indices.delete_mapping(index=index, doc_type=doc_type)

        return result.get('acknowledged', False)
    except elasticsearch.exceptions.NotFoundError:
        return True
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot delete mapping {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))
    except AttributeError:
        raise CommandExecutionError("Method is not applicable for Elasticsearch 5.0+")


def mapping_get(index, doc_type, hosts=None, profile=None):
    '''
    Retrieve mapping definition of index or index/type

    index
        Index for the mapping
    doc_type
        Name of the document type

    CLI example::

        salt myminion elasticsearch.mapping_get testindex user
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.indices.get_mapping(index=index, doc_type=doc_type)
    except elasticsearch.exceptions.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve mapping {0}, server returned code {1} with message {2}".format(index, e.status_code, e.error))


def index_template_create(name, body=None, hosts=None, profile=None, source=None):
    '''
    Create an index template

    name
        Index template name

    body
        Template definition as specified in http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html

    source
        URL to file specifying template definition. Cannot be used in combination with ``body``.

    CLI example::

        salt myminion elasticsearch.index_template_create testindex_templ '{ "template": "logstash-*", "order": 1, "settings": { "number_of_shards": 1 } }'
    '''
    es = _get_instance(hosts, profile)
    if source and body:
        message = 'Either body or source should be specified but not both.'
        raise SaltInvocationError(message)
    if source:
        body = __salt__['cp.get_file_str'](
                  source,
                  saltenv=__opts__.get('saltenv', 'base'))
    try:
        result = es.indices.put_template(name=name, body=body)
        return result.get('acknowledged', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot create template {0}, server returned code {1} with message {2}".format(name, e.status_code, e.error))


def index_template_delete(name, hosts=None, profile=None):
    '''
    Delete an index template (type) along with its data

    name
        Index template name

    CLI example::

        salt myminion elasticsearch.index_template_delete testindex_templ user
    '''
    es = _get_instance(hosts, profile)
    try:
        result = es.indices.delete_template(name=name)

        return result.get('acknowledged', False)
    except elasticsearch.exceptions.NotFoundError:
        return True
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot delete template {0}, server returned code {1} with message {2}".format(name, e.status_code, e.error))


def index_template_exists(name, hosts=None, profile=None):
    '''
    Return a boolean indicating whether given index template exists

    name
        Index template name

    CLI example::

        salt myminion elasticsearch.index_template_exists testindex_templ
    '''
    es = _get_instance(hosts, profile)
    try:
        return es.indices.exists_template(name=name)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve template {0}, server returned code {1} with message {2}".format(name, e.status_code, e.error))


def index_template_get(name, hosts=None, profile=None):
    '''
    Retrieve template definition of index or index/type

    name
        Index template name

    CLI example::

        salt myminion elasticsearch.index_template_get testindex_templ
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.indices.get_template(name=name)
    except elasticsearch.exceptions.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot retrieve template {0}, server returned code {1} with message {2}".format(name, e.status_code, e.error))


def pipeline_get(id, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Retrieve Ingest pipeline definition. Available since Elasticsearch 5.0.

    id
        Pipeline id

    CLI example::

        salt myminion elasticsearch.pipeline_get mypipeline
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.ingest.get_pipeline(id=id)
    except elasticsearch.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot create pipeline {0}, server returned code {1} with message {2}".format(id, e.status_code, e.error))
    except AttributeError:
        raise CommandExecutionError("Method is applicable only for Elasticsearch 5.0+")


def pipeline_delete(id, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Delete Ingest pipeline. Available since Elasticsearch 5.0.

    id
        Pipeline id

    CLI example::

        salt myminion elasticsearch.pipeline_delete mypipeline
    '''
    es = _get_instance(hosts, profile)

    try:
        ret = es.ingest.delete_pipeline(id=id)
        return ret.get('acknowledged', False)
    except elasticsearch.NotFoundError:
        return True
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot delete pipeline {0}, server returned code {1} with message {2}".format(id, e.status_code, e.error))
    except AttributeError:
        raise CommandExecutionError("Method is applicable only for Elasticsearch 5.0+")


def pipeline_create(id, body, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Create Ingest pipeline by supplied definition. Available since Elasticsearch 5.0.

    id
        Pipeline id
    body
        Pipeline definition as specified in https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html

    CLI example::

        salt myminion elasticsearch.pipeline_create mypipeline '{"description": "my custom pipeline", "processors": [{"set" : {"field": "collector_timestamp_millis", "value": "{{_ingest.timestamp}}"}}]}'
    '''
    es = _get_instance(hosts, profile)
    try:
        out = es.ingest.put_pipeline(id=id, body=body)
        return out.get('acknowledged', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot create pipeline {0}, server returned code {1} with message {2}".format(id, e.status_code, e.error))
    except AttributeError:
        raise CommandExecutionError("Method is applicable only for Elasticsearch 5.0+")


def pipeline_simulate(id, body, verbose=False, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Simulate existing Ingest pipeline on provided data. Available since Elasticsearch 5.0.

    id
        Pipeline id
    body
        Pipeline definition as specified in https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html
    verbose
        Specify if the output should be more verbose

    CLI example::

        salt myminion elasticsearch.pipeline_simulate mypipeline '{"docs":[{"_index":"index","_type":"type","_id":"id","_source":{"foo":"bar"}},{"_index":"index","_type":"type","_id":"id","_source":{"foo":"rab"}}]}' verbose=True
    '''
    es = _get_instance(hosts, profile)
    try:
        return es.ingest.simulate(id=id, body=body, verbose=verbose)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot simulate pipeline {0}, server returned code {1} with message {2}".format(id, e.status_code, e.error))
    except AttributeError:
        raise CommandExecutionError("Method is applicable only for Elasticsearch 5.0+")


def search_template_get(id, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Obtain existing search template definition.

    id
        Template ID

    CLI example::

        salt myminion elasticsearch.search_template_get mytemplate
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.get_template(id=id)
    except elasticsearch.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot obtain search template {0}, server returned code {1} with message {2}".format(id, e.status_code, e.error))


def search_template_create(id, body, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Create search template by supplied definition

    id
        Template ID
    body
        Search template definition

    CLI example::

        salt myminion elasticsearch.search_template_create mytemplate '{"template":{"query":{"match":{"title":"{{query_string}}"}}}}'
    '''
    es = _get_instance(hosts, profile)

    try:
        result = es.put_template(id=id, body=body)

        return result.get('acknowledged', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot create search template {0}, server returned code {1} with message {2}".format(id, e.status_code, e.error))


def search_template_delete(id, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Delete existing search template definition.

    id
        Template ID

    CLI example::

        salt myminion elasticsearch.search_template_delete mytemplate
    '''
    es = _get_instance(hosts, profile)

    try:
        result = es.delete_template(id=id)

        return result.get('acknowledged', False)
    except elasticsearch.NotFoundError:
        return True
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot delete search template {0}, server returned code {1} with message {2}".format(id, e.status_code, e.error))


def repository_get(name, local=False, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Get existing repository details.

    name
        Repository name
    local
        Retrieve only local information, default is false

    CLI example::

        salt myminion elasticsearch.repository_get testrepo
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.snapshot.get_repository(repository=name, local=local)
    except elasticsearch.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot obtain repository {0}, server returned code {1} with message {2}".format(name, e.status_code, e.error))


def repository_create(name, body, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Create repository for storing snapshots. Note that shared repository paths have to be specified in path.repo Elasticsearch configuration option.

    name
        Repository name
    body
        Repository definition as in https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html

    CLI example::

        salt myminion elasticsearch.repository_create testrepo '{"type":"fs","settings":{"location":"/tmp/test","compress":true}}'
    '''
    es = _get_instance(hosts, profile)

    try:
        result = es.snapshot.create_repository(repository=name, body=body)

        return result.get('acknowledged', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot create repository {0}, server returned code {1} with message {2}".format(name, e.status_code, e.error))


def repository_delete(name, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Delete existing repository.

    name
        Repository name

    CLI example::

        salt myminion elasticsearch.repository_delete testrepo
    '''
    es = _get_instance(hosts, profile)

    try:
        result = es.snapshot.delete_repository(repository=name)

        return result.get('acknowledged', False)
    except elasticsearch.NotFoundError:
        return True
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot delete repository {0}, server returned code {1} with message {2}".format(name, e.status_code, e.error))


def repository_verify(name, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Obtain list of cluster nodes which successfully verified this repository.

    name
        Repository name

    CLI example::

        salt myminion elasticsearch.repository_verify testrepo
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.snapshot.verify_repository(repository=name)
    except elasticsearch.NotFoundError:
        return None
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot verify repository {0}, server returned code {1} with message {2}".format(name, e.status_code, e.error))


def snapshot_status(repository=None, snapshot=None, ignore_unavailable=False, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Obtain status of all currently running snapshots.

    repository
        Particular repository to look for snapshots
    snapshot
        Snapshot name
    ignore_unavailable
        Ignore unavailable snapshots

    CLI example::

        salt myminion elasticsearch.snapshot_status ignore_unavailable=True
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.snapshot.status(repository=repository, snapshot=snapshot, ignore_unavailable=ignore_unavailable)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot obtain snapshot status, server returned code {0} with message {1}".format(e.status_code, e.error))


def snapshot_get(repository, snapshot, ignore_unavailable=False, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Obtain snapshot residing in specified repository.

    repository
        Repository name
    snapshot
        Snapshot name, use _all to obtain all snapshots in specified repository
    ignore_unavailable
        Ignore unavailable snapshots

    CLI example::

        salt myminion elasticsearch.snapshot_get testrepo testsnapshot
    '''
    es = _get_instance(hosts, profile)

    try:
        return es.snapshot.get(repository=repository, snapshot=snapshot, ignore_unavailable=ignore_unavailable)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot obtain details of snapshot {0} in repository {1}, server returned code {2} with message {3}".format(snapshot, repository, e.status_code, e.error))


def snapshot_create(repository, snapshot, body=None, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Create snapshot in specified repository by supplied definition.

    repository
        Repository name
    snapshot
        Snapshot name
    body
        Snapshot definition as in https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html

    CLI example::

        salt myminion elasticsearch.snapshot_create testrepo testsnapshot '{"indices":"index_1,index_2","ignore_unavailable":true,"include_global_state":false}'
    '''
    es = _get_instance(hosts, profile)

    try:
        response = es.snapshot.create(repository=repository, snapshot=snapshot, body=body)

        return response.get('accepted', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot create snapshot {0} in repository {1}, server returned code {2} with message {3}".format(snapshot, repository, e.status_code, e.error))


def snapshot_restore(repository, snapshot, body=None, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Restore existing snapshot in specified repository by supplied definition.

    repository
        Repository name
    snapshot
        Snapshot name
    body
        Restore definition as in https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html

    CLI example::

        salt myminion elasticsearch.snapshot_restore testrepo testsnapshot '{"indices":"index_1,index_2","ignore_unavailable":true,"include_global_state":true}'
    '''
    es = _get_instance(hosts, profile)

    try:
        response = es.snapshot.restore(repository=repository, snapshot=snapshot, body=body)

        return response.get('accepted', False)
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot restore snapshot {0} in repository {1}, server returned code {2} with message {3}".format(snapshot, repository, e.status_code, e.error))


def snapshot_delete(repository, snapshot, hosts=None, profile=None):
    '''
    .. versionadded:: 2017.7.0

    Delete snapshot from specified repository.

    repository
        Repository name
    snapshot
        Snapshot name

    CLI example::

        salt myminion elasticsearch.snapshot_delete testrepo testsnapshot
    '''
    es = _get_instance(hosts, profile)

    try:
        result = es.snapshot.delete(repository=repository, snapshot=snapshot)

        return result.get('acknowledged', False)
    except elasticsearch.NotFoundError:
        return True
    except elasticsearch.TransportError as e:
        raise CommandExecutionError("Cannot delete snapshot {0} from repository {1}, server returned code {2} with message {3}".format(snapshot, repository, e.status_code, e.error))