smok-serwis/coolamqp

View on GitHub
coolamqp/framing/frames.py

Summary

Maintainability
A
35 mins
Test Coverage
B
89%
# coding=UTF-8
"""
Concrete frame definitions
"""
from __future__ import absolute_import, division, print_function

import struct

import six

from coolamqp.framing.base import AMQPFrame
from coolamqp.framing.definitions import FRAME_METHOD, FRAME_HEARTBEAT, \
    FRAME_BODY, FRAME_HEADER, FRAME_END, \
    IDENT_TO_METHOD, CLASS_ID_TO_CONTENT_PROPERTY_LIST, FRAME_END_BYTE

STRUCT_BH = struct.Struct('!BH')
STRUCT_BHL = struct.Struct('!BHL')
STRUCT_HH = struct.Struct('!HH')
STRUCT_BHLHHQ = struct.Struct('!BHLHHQ')
STRUCT_HHQ = struct.Struct('!HHQ')
STRUCT_BHLB = struct.Struct('!BHLB')


class AMQPMethodFrame(AMQPFrame):
    FRAME_TYPE = FRAME_METHOD
    __slots__ = ('payload', )

    def __init__(self, channel, payload):
        # type: (int, coolamqp.framing.base.AMQPMethodPayload) -> None
        """
        :param channel: channel ID
        :param payload: AMQPMethodPayload instance
        """
        AMQPFrame.__init__(self, channel)
        self.payload = payload

    def __str__(self):  # type: () -> str
        return 'AMQPMethodFrame(%s, %s)' % (self.channel, self.payload)

    def write_to(self, buf):
        if self.payload.IS_CONTENT_STATIC:
            buf.write(STRUCT_BH.pack(FRAME_METHOD, self.channel))
            buf.write(self.payload.STATIC_CONTENT)
        else:
            buf.write(STRUCT_BHL.pack(FRAME_METHOD, self.channel,
                                      4 + self.payload.get_size()))
            buf.write(self.payload.BINARY_HEADER)
            self.payload.write_arguments(buf)
            buf.write(FRAME_END_BYTE)

    @classmethod
    def unserialize(cls, channel, payload_as_buffer):
        clsmet = STRUCT_HH.unpack_from(payload_as_buffer, 0)

        try:
            method_payload_class = IDENT_TO_METHOD[clsmet]
            payload = method_payload_class.from_buffer(payload_as_buffer, 4)
        except KeyError:
            raise ValueError('Invalid class %s method %s' % clsmet)
        else:
            return cls(channel, payload)

    def get_size(self):  # type: () -> int
        # frame_header = (method(1) + channel(2) + length(4) + class(2) + method(2) + payload(N) + frame_end(1))
        return 12 + self.payload.get_size()


class AMQPHeaderFrame(AMQPFrame):
    """
    A frame containing a message header

    :param channel: channel ID
    :type channel: int
    :param class_id: class ID
    :type class_id: int
    :param weight: weight (lol wut?)
    :param body_size: size of the body to follow
    :param properties: a suitable AMQPContentPropertyList instance
    """
    __slots__ = ('class_id', 'weight', 'body_size', 'properties')
    FRAME_TYPE = FRAME_HEADER

    def __init__(self, channel, class_id, weight, body_size, properties):
        AMQPFrame.__init__(self, channel)
        self.class_id = class_id
        self.weight = weight
        self.body_size = body_size
        self.properties = properties

    def write_to(self, buf):
        buf.write(STRUCT_BHLHHQ.pack(FRAME_HEADER, self.channel,
                                     12 + self.properties.get_size(), self.class_id,
                                     0,
                                     self.body_size))
        self.properties.write_to(buf)
        buf.write(FRAME_END_BYTE)

    @staticmethod
    def unserialize(channel, payload_as_buffer):
        # payload starts with class ID
        class_id, weight, body_size = STRUCT_HHQ.unpack_from(payload_as_buffer, 0)
        properties = CLASS_ID_TO_CONTENT_PROPERTY_LIST[class_id].from_buffer(
            payload_as_buffer, 12)
        return AMQPHeaderFrame(channel, class_id, weight, body_size,
                               properties)

    def get_size(self):  # type: () -> int
        # frame header is always 7, frame end is 1, content header is 12 + props
        return 20 + self.properties.get_size()

    def __str__(self):  # type: () -> str
        return 'AMQPHeaderFrame(%s, %s, %s, %s, %s)' % (self.channel, self.class_id, self.weight,
                                                        self.body_size, self.properties)


class AMQPBodyFrame(AMQPFrame):
    """
    A frame containing message body

    :param channel: Channel ID
    :type channel: int
    :param data: body (or a piece of it) of a message
    :type data: binary
    """
    __slots__ = ('data', )
    FRAME_TYPE = FRAME_BODY

    FRAME_SIZE_WITHOUT_PAYLOAD = 8

    def __init__(self, channel, data):  # type: (int, bytes) -> None
        AMQPFrame.__init__(self, channel)
        assert isinstance(data, (six.binary_type, memoryview))
        self.data = data

    def write_to(self, buf):
        buf.write(
            STRUCT_BHL.pack(FRAME_BODY, self.channel, len(self.data)))
        buf.write(self.data)
        buf.write(FRAME_END_BYTE)

    @classmethod
    def unserialize(cls, channel, payload_as_buffer):
        return cls(channel, payload_as_buffer)

    def get_size(self):  # type: () -> int
        return 8 + len(self.data)

    def __str__(self):  # type: () -> str
        return '<AMQPBodyFrame of size %s>' % (len(self.data),)


class AMQPHeartbeatFrame(AMQPFrame):
    FRAME_TYPE = FRAME_HEARTBEAT
    LENGTH = 8
    DATA = STRUCT_BHLB.pack(FRAME_HEARTBEAT, 0, 0, FRAME_END)
    __slots__ = ()

    def __init__(self):
        AMQPFrame.__init__(self, 0)

    def write_to(self, buf):
        buf.write(AMQPHeartbeatFrame.DATA)

    def get_size(self):
        return AMQPHeartbeatFrame.LENGTH

    def __str__(self):
        return 'AMQPHeartbeatFrame()'