saltstack/salt

View on GitHub
salt/transport/client.py

Summary

Maintainability
D
2 days
Test Coverage
# -*- coding: utf-8 -*-
'''
Encapsulate the different transports available to Salt.

This includes client side transport, for the ReqServer and the Publisher
'''

# Import Python Libs
from __future__ import absolute_import, print_function, unicode_literals
import logging

# Import Salt Libs
from salt.utils.asynchronous import SyncWrapper

log = logging.getLogger(__name__)


class ReqChannel(object):
    '''
    Factory class to create a Sync communication channels to the ReqServer
    '''
    @staticmethod
    def factory(opts, **kwargs):
        # All Sync interfaces are just wrappers around the Async ones
        sync = SyncWrapper(AsyncReqChannel.factory, (opts,), kwargs)
        return sync

    def send(self, load, tries=3, timeout=60, raw=False):
        '''
        Send "load" to the master.
        '''
        raise NotImplementedError()

    def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):
        '''
        Send "load" to the master in a way that the load is only readable by
        the minion and the master (not other minions etc.)
        '''
        raise NotImplementedError()


class PushChannel(object):
    '''
    Factory class to create Sync channel for push side of push/pull IPC
    '''
    @staticmethod
    def factory(opts, **kwargs):
        sync = SyncWrapper(AsyncPushChannel.factory, (opts,), kwargs)
        return sync

    def send(self, load, tries=3, timeout=60):
        '''
        Send load across IPC push
        '''
        raise NotImplementedError()


class PullChannel(object):
    '''
    Factory class to create Sync channel for pull side of push/pull IPC
    '''
    @staticmethod
    def factory(opts, **kwargs):
        sync = SyncWrapper(AsyncPullChannel.factory, (opts,), kwargs)
        return sync


# TODO: better doc strings
class AsyncChannel(object):
    '''
    Parent class for Async communication channels
    '''
    # Resolver is used by Tornado TCPClient.
    # This static field is shared between
    # AsyncReqChannel and AsyncPubChannel.
    # This will check to make sure the Resolver
    # is configured before first use.
    _resolver_configured = False

    @classmethod
    def _config_resolver(cls, num_threads=10):
        from tornado.netutil import Resolver
        Resolver.configure(
                'tornado.netutil.ThreadedResolver',
                num_threads=num_threads)
        cls._resolver_configured = True


# TODO: better doc strings
class AsyncReqChannel(AsyncChannel):
    '''
    Factory class to create a Async communication channels to the ReqServer
    '''
    @classmethod
    def factory(cls, opts, **kwargs):
        # Default to ZeroMQ for now
        ttype = 'zeromq'

        # determine the ttype
        if 'transport' in opts:
            ttype = opts['transport']
        elif 'transport' in opts.get('pillar', {}).get('master', {}):
            ttype = opts['pillar']['master']['transport']

        # switch on available ttypes
        if ttype == 'zeromq':
            import salt.transport.zeromq
            return salt.transport.zeromq.AsyncZeroMQReqChannel(opts, **kwargs)
        elif ttype == 'tcp':
            if not cls._resolver_configured:
                # TODO: add opt to specify number of resolver threads
                AsyncChannel._config_resolver()
            import salt.transport.tcp
            return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs)
        elif ttype == 'local':
            import salt.transport.local
            return salt.transport.local.AsyncLocalChannel(opts, **kwargs)
        else:
            raise Exception(
                'Channels are only defined for tcp, zeromq, and local'
            )
            # return NewKindOfChannel(opts, **kwargs)

    def send(self, load, tries=3, timeout=60, raw=False):
        '''
        Send "load" to the master.
        '''
        raise NotImplementedError()

    def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):
        '''
        Send "load" to the master in a way that the load is only readable by
        the minion and the master (not other minions etc.)
        '''
        raise NotImplementedError()


class AsyncPubChannel(AsyncChannel):
    '''
    Factory class to create subscription channels to the master's Publisher
    '''
    @classmethod
    def factory(cls, opts, **kwargs):
        # Default to ZeroMQ for now
        ttype = 'zeromq'

        # determine the ttype
        if 'transport' in opts:
            ttype = opts['transport']
        elif 'transport' in opts.get('pillar', {}).get('master', {}):
            ttype = opts['pillar']['master']['transport']

        # switch on available ttypes
        if ttype == 'detect':
            opts['detect_mode'] = True
            log.info('Transport is set to detect; using %s', ttype)
        if ttype == 'zeromq':
            import salt.transport.zeromq
            return salt.transport.zeromq.AsyncZeroMQPubChannel(opts, **kwargs)
        elif ttype == 'tcp':
            if not cls._resolver_configured:
                # TODO: add opt to specify number of resolver threads
                AsyncChannel._config_resolver()
            import salt.transport.tcp
            return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs)
        elif ttype == 'local':  # TODO:
            import salt.transport.local
            return salt.transport.local.AsyncLocalPubChannel(opts, **kwargs)
        else:
            raise Exception(
                'Channels are only defined for tcp, zeromq, and local'
            )
            # return NewKindOfChannel(opts, **kwargs)

    def connect(self):
        '''
        Return a future which completes when connected to the remote publisher
        '''
        raise NotImplementedError()

    def on_recv(self, callback):
        '''
        When jobs are received pass them (decoded) to callback
        '''
        raise NotImplementedError()


class AsyncPushChannel(object):
    '''
    Factory class to create IPC Push channels
    '''
    @staticmethod
    def factory(opts, **kwargs):
        '''
        If we have additional IPC transports other than UxD and TCP, add them here
        '''
        # FIXME for now, just UXD
        # Obviously, this makes the factory approach pointless, but we'll extend later
        import salt.transport.ipc
        return salt.transport.ipc.IPCMessageClient(opts, **kwargs)


class AsyncPullChannel(object):
    '''
    Factory class to create IPC pull channels
    '''
    @staticmethod
    def factory(opts, **kwargs):
        '''
        If we have additional IPC transports other than UXD and TCP, add them here
        '''
        import salt.transport.ipc
        return salt.transport.ipc.IPCMessageServer(opts, **kwargs)

## Additional IPC messaging patterns should provide interfaces here, ala router/dealer, pub/sub, etc

# EOF