bomquote/transistor

View on GitHub
transistor/schedulers/brokers/queues.py

Summary

Maintainability
A
0 mins
Test Coverage
# -*- coding: utf-8 -*-
"""
transistor.schedulers.brokers.queues
~~~~~~~~~~~~
Exchange and task queue with support for using different queues for each
tracker. Works with RabbitMQ or Redis acting as a broker.

:copyright: Copyright (C) 2018 by BOM Quote Limited
:license: The MIT License, see LICENSE for more details.
~~~~~~~~~~~~
"""
from kombu import Exchange, Queue
from typing import List


class ExchangeQueue:
    """
    Setup an Exchange and a separate Queue for each named tracker in
    the `trackers` parameter.  Note: best practice is to explicitly
    declare the queues when using this. It would probably look like:
    >>> for queue in tasks.task_queues:
    >>>     queue(broker_connection).declare()
    """
    def __init__(self, trackers: List[str], exchange_name: str='transistor',
                 exchange_type: str='direct'):
        self.trackers = trackers
        self.exchange_name = exchange_name
        self.exchange_type = exchange_type
        self.task_exchange = Exchange(self.exchange_name, type=self.exchange_type)
        self.task_queues = []
        self._init_task_queues()

    def _init_task_queues(self):
        """
        Assuming we init with parameter `trackers = ['mousekey.com', 'digidog.com',
        'futuredigi.com']`
        then this should set self.task_queues as below:

        self.task_queues = [
           Queue('mousekey.com', task_exchange, routing_key='mousekey.com'),
           Queue('digidog.com', task_exchange, routing_key='digidog.com'),
           Queue('futuredigi.com', task_exchange, routing_key='futuredigi.com')]
        """
        for tracker_name in self.trackers:
            queue = Queue(tracker_name, self.task_exchange, routing_key=tracker_name)
            self.task_queues.append(queue)