odin-detector/odin-data

View on GitHub
python/src/odin_data/control/ipc_channel.py

Summary

Maintainability
A
1 hr
Test Coverage
"""Implementation of odin_data inter-process communication channels.

This module implements the ODIN data IpcChannel class for inter-process
communication via ZeroMQ sockets.

Tim Nicholls, STFC Application Engineering Group
"""
import random
import zmq
from odin_data.util import cast_bytes, cast_unicode

class IpcChannelException(Exception):
    """Exception class for IpcChannel.

    This class implements a simple exception for use with IpcChannel, providing
    a readable message and optionsal error code.
    """

    def __init__(self, msg, errno=None):
        """Initalise the exception.

        :param msg: readable message associated with the exception
        :param errno: optional error number assocated with the exception
        """
        super(IpcChannelException, self).__init__()
        self.msg = msg
        self.errno = errno

    def __str__(self):
        """Return string representation of the exception message."""
        return str(self.msg)


class IpcChannel(object):
    """Inter-process communication channel class.

    This class provides ZeroMQ-based interprocess communication channels to
    odin-data.
    """
    # pylint: disable=no-member
    CHANNEL_TYPE_PAIR = zmq.PAIR
    CHANNEL_TYPE_REQ = zmq.REQ
    CHANNEL_TYPE_SUB = zmq.SUB
    CHANNEL_TYPE_PUB = zmq.PUB
    CHANNEL_TYPE_DEALER = zmq.DEALER
    CHANNEL_TYPE_ROUTER = zmq.ROUTER

    EVENT_ACCEPTED = zmq.EVENT_CONNECTED
    EVENT_DISCONNECTED = zmq.EVENT_DISCONNECTED

    POLLIN = zmq.POLLIN
    POLLOUT = zmq.POLLOUT
    POLLERR = zmq.POLLERR
    # pylint: enable=no-member

    def __init__(self, channel_type, endpoint=None, context=None, identity=None):
        """Initalise the IpcChannel object.

        :param channel_type: ZeroMQ socket type, using CHANNEL_TYPE_xxx constants
        :param endpoint: URI of channel endpoint, can be specified later
        :param context: ZeroMQ context, will be initialised if not given
        :param identity: channel identity for DEALER type sockets
        """
        # Initalise channel type and endpoint if given
        self.channel_type = channel_type
        if endpoint:
            self.endpoint = endpoint

        # Initialise the ZeroMQ context or obtain the current instance
        self.context = context or zmq.Context().instance()

        # Create the socket
        self.socket = self.context.socket(self.channel_type)

        # If the socket type is DEALER, set the identity, chosing a random
        # UUID4 value if not specified
        if self.channel_type == self.CHANNEL_TYPE_DEALER:
            if identity is None:
                identity = "{:04x}-{:04x}".format(
                    random.randrange(0x10000), random.randrange(0x10000)
                )
            self.identity = identity
            self.socket.setsockopt(zmq.IDENTITY, cast_bytes(identity))  # pylint: disable=no-member

    def bind(self, endpoint=None):
        """Bind the IpcChannel to an endpoint.

        :param: endpoint: endpoint URI to use, otherwise use initialised value
        """
        if endpoint:
            self.endpoint = endpoint

        self.socket.bind(self.endpoint)

    def connect(self, endpoint=None):
        """Connect the IpcChannel to an endpoint.

        :param: endpoint: endpoint URI to use, otherwise use initialised value
        """
        if endpoint:
            self.endpoint = endpoint

        self.socket.connect(self.endpoint)

    def close(self):
        """Close the IpcChannel socket."""
        self.socket.close()

    def send(self, data):
        """Send data to the IpcChannel.

        :param: data to send on channel
        """
        # Ensure a byte stream is ready to be sent on the socket
        data = cast_bytes(data)

        # Send the data
        self.socket.send(data)

    def send_multipart(self, data):
        """
        Send data to the IpcChannel as a multi part message
        :param: data to send, as an iterable object
        """
                
        self.socket.send_multipart(data)

    def recv(self):
        """Recieve data from the IpcChannel.

        :return: returns either the data received, or, in the case of DEALER/ROUTER
        channels, a tuple of DEALER channel identity and data received
        """
        # Use multipart receive to cope with data coming from DEALER sockets,
        # where the message is prefixed by the socket identity. Convert incoming
        # data back to native strings if required
        data = list(map(cast_unicode, self.socket.recv_multipart()))

        # If our local channel is a router, the remote endpoint should (must) be a
        # dealer, in which case pop the identity off the front of the data and
        # return both.
        if self.channel_type == self.CHANNEL_TYPE_ROUTER:
            identity = data.pop(0)
            return (identity, data)

        return data[0]

    def poll(self, timeout=None):
        """Poll the IpcChannel socket for I/O events.

        :param timeout: poll timeout in milliseconds
        :return list of poll events for the socket
        """
        pollevts = self.socket.poll(timeout)
        return pollevts

    def subscribe(self, topic=b''):
        """Set the topic subscription for SUB sockets.

        :param topic: topic to subscribe to
        """
        if self.channel_type == self.CHANNEL_TYPE_SUB:
            self.socket.setsockopt(zmq.SUBSCRIBE, topic)  # pylint: disable=no-member
        else:
            raise IpcChannelException("Attmped to set topic subscription on non-SUB channel socket")