georgeyk/loafer

View on GitHub
loafer/ext/aws/handlers.py

Summary

Maintainability
A
0 mins
Test Coverage
import json
import logging

from .bases import BaseSNSClient, BaseSQSClient

logger = logging.getLogger(__name__)


class SQSHandler(BaseSQSClient):
    queue_name = None

    def __init__(self, queue_name=None, **kwargs):
        self.queue_name = queue_name or self.queue_name
        super().__init__(**kwargs)

    def __str__(self):
        return '<{}: {}>'.format(type(self).__name__, self.queue_name)

    async def publish(self, message, encoder=json.dumps):
        if not self.queue_name:
            raise ValueError('{}: missing queue_name attribute'.format(type(self).__name__))

        if encoder:
            message = encoder(message)

        logger.debug('publishing, queue={}, message={}'.format(self.queue_name, message))

        queue_url = await self.get_queue_url(self.queue_name)
        async with self.get_client() as client:
            return await client.send_message(QueueUrl=queue_url, MessageBody=message)

    async def handle(self, message, *args):
        return await self.publish(message)


class SNSHandler(BaseSNSClient):
    topic = None

    def __init__(self, topic=None, **kwargs):
        self.topic = topic or self.topic
        super().__init__(**kwargs)

    def __str__(self):
        return '<{}: {}>'.format(type(self).__name__, self.topic)

    async def publish(self, message, encoder=json.dumps):
        if not self.topic:
            raise ValueError('{}: missing topic attribute'.format(type(self).__name__))

        if encoder:
            message = encoder(message)

        topic_arn = await self.get_topic_arn(self.topic)
        logger.debug('publishing, topic={}, message={}'.format(topic_arn, message))

        msg = json.dumps({'default': message})
        async with self.get_client() as client:
            return await client.publish(TopicArn=topic_arn, MessageStructure='json', Message=msg)

    async def handle(self, message, *args):
        return await self.publish(message)