georgeyk/loafer

View on GitHub
loafer/ext/aws/routes.py

Summary

Maintainability
C
1 day
Test Coverage
from ...routes import Route
from .message_translators import SNSMessageTranslator, SQSMessageTranslator
from .providers import SQSProvider


class SQSRoute(Route):
    def __init__(self, provider_queue, provider_options=None, *args, **kwargs):
        provider_options = provider_options or {}
        provider = SQSProvider(provider_queue, **provider_options)
        kwargs['provider'] = provider
        if 'message_translator' not in kwargs:
            kwargs['message_translator'] = SQSMessageTranslator()
        if 'name' not in kwargs:
            kwargs['name'] = provider_queue

        super().__init__(*args, **kwargs)


class SNSQueueRoute(Route):
    def __init__(self, provider_queue, provider_options=None, *args, **kwargs):
        provider_options = provider_options or {}
        provider = SQSProvider(provider_queue, **provider_options)
        kwargs['provider'] = provider
        if 'message_translator' not in kwargs:
            kwargs['message_translator'] = SNSMessageTranslator()
        if 'name' not in kwargs:
            kwargs['name'] = provider_queue

        super().__init__(*args, **kwargs)