opensistemas-hub/osbrain

View on GitHub
osbrain/address.py

Summary

Maintainability
C
1 day
Test Coverage
"""
Implementation of address-related features.
"""
from ipaddress import ip_address

import zmq

from .common import unique_identifier


def address_to_host_port(addr):
    """
    Try to convert an address to a (host, port) tuple.

    Parameters
    ----------
    addr : str, SocketAddress

    Returns
    -------
    tuple
        A (host, port) tuple formed with the corresponding data.
    """
    if addr is None:
        return (None, None)
    # Try the most common case (well-defined types)
    try:
        return _common_address_to_host_port(addr)
    except TypeError:
        pass
    # Try to do something anyway
    if hasattr(addr, 'host') and hasattr(addr, 'port'):
        return (addr.host, addr.port)
    raise ValueError('Unsupported address type "%s"!' % type(addr))


def _common_address_to_host_port(addr):
    """
    Try to convert an address to a (host, port) tuple.

    This function is meant to be used with well-known types. For a more
    general case, use the `address_to_host_port` function instead.

    Parameters
    ----------
    addr : str, SocketAddress, AgentAddress

    Returns
    -------
    tuple
        A (host, port) tuple formed with the corresponding data.
    """
    if isinstance(addr, SocketAddress):
        return (addr.host, addr.port)
    if isinstance(addr, AgentAddress):
        return (addr.address.host, addr.address.port)
    if isinstance(addr, str):
        aux = addr.split(':')
        if len(aux) == 1:
            port = None
        else:
            port = int(aux[-1])
        host = aux[0]
        return (host, port)
    raise TypeError('Unsupported address type "%s"!' % type(addr))


def guess_kind(kind):
    """
    Guess if a kind string is an AgentAddressKind or AgentChannelKind.

    Parameters
    ----------
    kind : str
        The AgentAddressKind or AgentChannelKind in string format.

    Returns
    ----------
    AgentAddressKind or AgentChannelKind
        The actual kind type.
    """
    try:
        return AgentAddressKind(kind)
    except ValueError:
        return AgentChannelKind(kind)


class AgentAddressTransport(str):
    """
    Agent's address transport class. It can be 'tcp', 'ipc' or 'inproc'.
    """

    def __new__(cls, value):
        if value not in ['tcp', 'ipc', 'inproc']:
            raise ValueError('Invalid address transport "%s"!' % value)
        return super().__new__(cls, value)


class AgentAddressRole(str):
    """
    Agent's address role class. It can either be ``'server'`` or ``'client'``.
    """

    def __new__(cls, value):
        if value not in ['server', 'client']:
            raise ValueError('Invalid address role "%s"!' % value)
        return super().__new__(cls, value)

    def twin(self):
        """
        Get the twin role of the current one. ``'server'`` would be the twin
        of ``'client'`` and viceversa.

        Returns
        -------
        AgentAddressRole
            The twin role.
        """
        if self == 'server':
            return self.__class__('client')
        return self.__class__('server')


class AgentAddressKind(str):
    """
    Agent's address kind class.

    This kind represents the communication pattern being used by the agent
    address: REP, PULL, PUB...
    """

    TWIN = {
        'REQ': 'REP',
        'REP': 'REQ',
        'PUSH': 'PULL',
        'PULL': 'PUSH',
        'PUB': 'SUB',
        'SUB': 'PUB',
        'PULL_SYNC_PUB': 'PUSH_SYNC_SUB',
        'PUSH_SYNC_SUB': 'PULL_SYNC_PUB',
    }
    ZMQ_KIND_CONVERSION = {
        'REQ': zmq.REQ,
        'REP': zmq.REP,
        'PUSH': zmq.PUSH,
        'PULL': zmq.PULL,
        'PUB': zmq.PUB,
        'SUB': zmq.SUB,
        'PULL_SYNC_PUB': zmq.PULL,
        'PUSH_SYNC_SUB': zmq.PUSH,
    }
    REQUIRE_HANDLER = ('REP', 'PULL', 'SUB', 'PULL_SYNC_PUB')

    def __new__(cls, kind):
        if kind not in cls.TWIN.keys():
            raise ValueError('Invalid address kind "%s"!' % kind)
        return super().__new__(cls, kind)

    def zmq(self):
        """
        Get the equivalent ZeroMQ socket kind.

        Returns
        -------
        int
        """
        return self.ZMQ_KIND_CONVERSION[self]

    def requires_handler(self):
        """
        Whether the Agent's address kind requires a handler or not.
        A socket which processes incoming messages would require a
        handler (i.e. 'REP', 'PULL', 'SUB'...).

        Returns
        -------
        bool
        """
        return self in self.REQUIRE_HANDLER

    def twin(self):
        """
        Get the twin kind of the current one.

        ``REQ`` would be the twin of ``REP`` and viceversa, ``PUB`` would be
        the twin of ``SUB`` and viceversa, etc.

        Returns
        -------
        AgentAddressKind
            The twin kind of the current one.
        """
        return self.__class__(self.TWIN[self])


class AgentAddressSerializer(str):
    """
    Agent's address serializer class.

    Each communication channel will have a serializer.

    Note that for ``raw`` message passing, everything must be on bytes, and the
    programmer is the one responsible for converting data to bytes.

    Parameters
    ----------
    serializer_type : str
        Serializer type (i.e.: 'raw', 'pickle', 'cloudpickle', 'dill', 'json').
    """

    SERIALIZER_SIMPLE = ('raw',)
    SERIALIZER_SEPARATOR = ('pickle', 'cloudpickle', 'dill', 'json')

    def __new__(cls, value):
        if value not in cls.SERIALIZER_SIMPLE + cls.SERIALIZER_SEPARATOR:
            raise ValueError('Invalid serializer type %s!' % value)
        return super().__new__(cls, value)

    def __init__(self, value):
        self.requires_separator = value in self.SERIALIZER_SEPARATOR


class SocketAddress(object):
    """
    Socket address information consisting on the host and port.

    Parameters
    ----------
    host : str, ipaddress.IPv4Address
        IP address.
    port : int
        Port number.

    Attributes
    ----------
    host : ipaddress.IPv4Address
        IP address.
    port : int
        Port number.
    """

    def __init__(self, host, port):
        assert isinstance(
            port, int
        ), 'Incorrect parameter port on SocketAddress; expecting type int.'
        self.host = str(ip_address(host))
        self.port = port

    def __repr__(self):
        """
        Return the string representation of the SocketAddress.

        Returns
        -------
        representation : str
        """
        return '%s:%s' % (self.host, self.port)

    def __hash__(self):
        return hash(self.host) ^ hash(self.port)

    def __eq__(self, other):
        if not isinstance(other, SocketAddress):
            return False
        return self.host == other.host and self.port == other.port


class AgentAddress:
    """
    Agent address information consisting on the transport protocol, address,
    kind and role.

    Parameters
    ----------
    transport : str, AgentAddressTransport
        Agent transport protocol.
    address : str
        Agent address.
    kind : str, AgentAddressKind
        Agent kind.
    role : str, AgentAddressRole
        Agent role.
    serializer : str
        Agent serializer type.

    Attributes
    ----------
    transport : str, AgentAddressTransport
        Agent transport protocol.
    address : str, SocketAddress
        Agent address.
    kind : AgentAddressKind
        Agent kind.
    role : AgentAddressRole
        Agent role.
    serializer : AgentAddressSerializer
        Agent serializer.
    """

    def __init__(self, transport, address, kind, role, serializer):
        if transport == 'tcp':
            address = SocketAddress(*address_to_host_port(address))
        self.transport = AgentAddressTransport(transport)
        self.address = address
        self.kind = AgentAddressKind(kind)
        self.role = AgentAddressRole(role)
        self.serializer = AgentAddressSerializer(serializer)

    def __repr__(self):
        """
        Return the string representation of the AgentAddress.

        Returns
        -------
        representation : str
        """
        return 'AgentAddress(%s, %s, %s, %s, %s)' % (
            self.transport,
            self.address,
            self.kind,
            self.role,
            self.serializer,
        )

    def __hash__(self):
        return (
            hash(self.transport)
            ^ hash(self.address)
            ^ hash(self.kind)
            ^ hash(self.role)
            ^ hash(self.serializer)
        )

    def __eq__(self, other):
        if not isinstance(other, AgentAddress):
            return False
        return (
            self.transport == other.transport
            and self.address == other.address
            and self.kind == other.kind
            and self.role == other.role
            and self.serializer == other.serializer
        )

    def twin(self):
        """
        Return the twin address of the current one.

        While the `host` and `port` are kept for the twin, the `kind` and
        `role` change to their corresponding twins, according to the
        rules defined in the respective classes.

        Returns
        -------
        AgentAddress
            The twin address of the current one.
        """
        kind = self.kind.twin()
        role = self.role.twin()
        return self.__class__(
            self.transport, self.address, kind, role, self.serializer
        )


class AgentChannelKind(str):
    """
    Agent's channel kind class.

    This kind represents the communication pattern being used by the agent
    channel: ASYNC_REP, STREAM...
    """

    TWIN = {
        'ASYNC_REP': 'ASYNC_REQ',
        'ASYNC_REQ': 'ASYNC_REP',
        'SYNC_PUB': 'SYNC_SUB',
        'SYNC_SUB': 'SYNC_PUB',
    }

    def __new__(cls, kind):
        if kind not in cls.TWIN.keys():
            raise ValueError('Invalid channel kind "%s"!' % kind)
        return super().__new__(cls, kind)

    def twin(self):
        """
        Get the twin kind of the current one.

        ``REQ`` would be the twin of ``REP`` and viceversa, ``PUB`` would be
        the twin of ``SUB`` and viceversa, etc.

        Returns
        -------
        AgentChannelKind
        """
        return self.__class__(self.TWIN[self])


class AgentChannel:
    """
    Agent channel information.

    Channels are communication means with sender and receiver in both sides
    (i.e.: PULL+PUB - PUSH-SUB or PULL+PUSH - PUSH+PULL).

    Parameters
    ----------
    kind : AgentChannelKind
        Agent kind.
    sender : str
        First AgentAddress.
    receiver : str
        Second AgentAddress.

    Attributes
    ----------
    kind : AgentChannelKind
        Agent kind.
    sender : str
        First AgentAddress.
    receiver : str
        Second AgentAddress.
    """

    def __init__(self, kind, receiver, sender, twin_uuid=None):
        self.kind = AgentChannelKind(kind)
        self.receiver = receiver
        self.sender = sender
        self.transport = receiver.transport if receiver else sender.transport
        self.serializer = (
            receiver.serializer if receiver else sender.serializer
        )
        self.uuid = unique_identifier()
        self.twin_uuid = twin_uuid
        # Set up pairs
        if sender:
            self.sender.channel = self
        if receiver:
            self.receiver.channel = self

    def __repr__(self):
        """
        Return the string representation of the AgentChannel.

        Returns
        -------
        representation : str
        """
        return 'AgentChannel(kind=%s, receiver=%s, sender=%s)' % (
            self.kind,
            self.receiver,
            self.sender,
        )

    def __hash__(self):
        return hash(self.kind) ^ hash(self.receiver) ^ hash(self.sender)

    def __eq__(self, other):
        if not isinstance(other, AgentChannel):
            return False
        return (
            self.kind == other.kind
            and self.receiver == other.receiver
            and self.sender == other.sender
        )

    def twin(self):
        """
        Get the twin channel of the current one.

        Returns
        -------
        AgentChannel
            The twin channel.
        """
        kind = self.kind.twin()
        sender = self.receiver.twin() if self.receiver is not None else None
        receiver = self.sender.twin() if self.sender is not None else None
        return self.__class__(
            kind=kind, receiver=receiver, sender=sender, twin_uuid=self.uuid
        )