bomquote/transistor

View on GitHub
transistor/managers/base_manager.py

Summary

Maintainability
B
6 hrs
Test Coverage
# -*- coding: utf-8 -*-
"""
transistor.managers.base_manager
~~~~~~~~~~~~
This module implements BaseWorkGroupManager as a fully functional base class
which can assign tasks and conduct a scrape job across an arbitrary number
of WorkGroups. Each WorkGroup can contain an arbitrary number of
Worker/Scrapers.

It's `tasks` parameter can accept a StatefulBook instance, which transforms
a spreadsheet column into keyword tasks search terms. The `tasks` parameter
can also accept an ExchangeQueue instance, which creates an AMQP compatable
exchange and queue, while BaseWorkGroupManager acts as a consumer/worker,
processing tasks from a broker like RabbitMQ or Redis.

Although this class is fully functional as-is. The monitor() method provides an
excellent hook point for post-scrape Worker manipulation. A more robust implementation
will subclass BaseManager and override the monitor method for customization.


:copyright: Copyright (C) 2018 by BOM Quote Limited
:license: The MIT License, see LICENSE for more details.
~~~~~~~~~~~~
"""

import gevent
import json
from typing import List, Type, Union
from gevent.queue import Queue, Empty
from gevent.pool import Pool
from gevent.exceptions import LoopExit
from kombu import Connection
from kombu.mixins import ConsumerMixin
from transistor.schedulers.books.bookstate import StatefulBook
from transistor.schedulers.brokers.queues import ExchangeQueue
from transistor.workers.workgroup import WorkGroup
from transistor.exceptions import IncompatibleTasks
from transistor.utility.logging import logger
from kombu.utils.functional import reprcall


class BaseWorkGroupManager(ConsumerMixin):
    """
    Base class for a WorkGroupManager.
    """
    __attrs__ = [
        'book', 'exporter', 'job_id', 'trackers', 'pool', 'qitems',
        'workgroups',
    ]

    def __init__(self, job_id, tasks: Type[Union[Type[StatefulBook],
                                                 Type[ExchangeQueue]]],
                 workgroups: List[WorkGroup], pool: int=20,
                 connection: Connection = None, should_stop=True, **kwargs):
        """

        Create the instance.

        :param job_id: will save the result of the workers Scrapes to `job_id` list.
        If this job_id is "NONE" then it will pass on the save.
        :param tasks:  a StatefulBook or ExchangeQueue instance.
        :param workgroups: a list of class: `WorkGroup()` objects.
        :param pool: size of the greenlets pool. If you want to utilize all the
        workers concurrently, it should be at least the total number
        of all workers + 1 for the manager and +1 for the broker runner in
        self.run() method. Otherwise, the pool is also useful to constrain
        concurrency to help stay within Crawlera subscription limits.
        :param connection: a kombu Connection object, should include the URI to
        connect to either RabbitMQ or Redis.
        :param should_stop: whether to run indefinitely or to stop after the
        manager queue runs empty.
        Example:
            >>> groups = [
            >>> WorkGroup(class_=MouseKeyGroup, workers=5, kwargs={"china":True}),
            >>> WorkGroup(class_=MouseKeyGroup, workers=5, kwargs={})
            >>> ]
        :param pool: number of greenlets to create
        """
        self.job_id = job_id
        self.tasks = tasks
        self.groups = workgroups
        self.pool = Pool(pool)
        self.qitems = {}
        self.workgroups = {}
        self.qtimeout = kwargs.get('qtimeout', 5)
        self.mgr_qtimeout = self.qtimeout//2 if self.qtimeout else None
        self.connection = connection
        self.kombu = False
        self.mgr_should_stop = should_stop
        self.mgr_no_work = False
        # call this last
        self._init_tasks(kwargs)

    def _init_tasks(self, kwargs):
        """
        Create individual task queues for the workers.

        If, Type[StatefulBook] is passed as the `tasks` parameter, the tracker with
        a name that matches a workgroup name, is effectively the workgroup's
        task queue. So, extract the tracker name from self.book.to_do()
        and the tracker name should match the worker name.

        Extract the tracker name and then create qitems:

        Example hint, `self.tasks.to_do()` looks like this:
        deque([<TaskTracker(name=mousekey.cn)>, <TaskTracker(name=mousekey.com)>])
        """
        if isinstance(self.tasks, StatefulBook):
            for tracker in self.tasks.to_do():
                # set the name of qitems key to tracker.name
                self.qitems[tracker.name] = Queue(items=tracker.to_do())

        elif isinstance(self.tasks, ExchangeQueue):
            for tracker in self.tasks.trackers:
               self.qitems[tracker] = Queue()
            self.kombu = True

        else:
            raise IncompatibleTasks('`task` parameter must be an instance of '
                                    'StatefulBook or ExchangeQueue')

        # if not a stateful book. The class should have some attribute which
        # presents a list-like object, where this list-like object is a
        # list of queues.

        # classes of type Type[X], where X has attributes X.name and X.to_do(),
        # where X.to_do() returns object appropriate for Queue(items=X.to_do())

        self._init_workers(kwargs)

    def _init_workers(self, kwargs):
        """
        Create the WorkGroups by tracker name and assign them by name to the
        workgroups dict.

        :return:
        """
        # first, build a list from tracker names per qitems.keys()
        names = [name for name in self.qitems.keys()]
        for name in names:
            for group in self.groups:
                # match the tracker name to the group name
                if group.name == name:
                    # assumes `group` is a WorkGroup namedtuple
                    # add attrs to group.kwargs dict so they can be passed down
                    # to the group/worker/spider and assigned as attrs
                    group.kwargs['name'] = name
                    group.kwargs['url'] = group.url
                    group.kwargs['spider'] = group.spider
                    group.kwargs['worker'] = group.worker
                    group.kwargs['items'] = group.items
                    group.kwargs['loader'] = group.loader
                    # exporters is a list of exporter instances
                    group.kwargs['exporters'] = group.exporters
                    if not group.kwargs.get('qtimeout', None):
                        group.kwargs['qtimeout'] = self.qtimeout
                    basegroup = group.group(
                        staff=group.workers, job_id=self.job_id, **group.kwargs)
                    # now that attrs assigned, init the workers in the basegroup class
                    basegroup.init_workers()
                    # lastly, after calling init_workers, assign the workgroup
                    # instance to the workgroups dict with key = `name`
                    self.workgroups[name] = basegroup

    def get_consumers(self, Consumer, channel):
        """
        Must be implemented for Kombu ConsumerMixin
        """
        return [Consumer(queues=self.tasks.task_queues,
                         accept=['json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        """
        Process messages to extract the task keywords and then
        load them into a gevent Queue for each tracker.

        To customize how this Manger class works with the broker,
        this method should be a top consideration to override.

        Kwargs is not currently used. But it could be very useful
        to set logic flags for use in this method.
        """
        keywords = body['keywords']
        kwargs = body['kwargs']
        logger.info(f'Got task: {reprcall(keywords)}')
        try:
            if isinstance(keywords, str):
                keywords = json.loads(keywords)
            for key in self.qitems.keys():
                for item in keywords:
                    self.qitems[key].put(item)
            if not self.mgr_should_stop:
                if self.mgr_no_work:
                    gevent.spawn(self.manage).join()
        except Exception as exc:
            logger.error(f'task raised exception: {exc}')
        message.ack()

    def spawn_list(self):
        """"
        The spawn() method begins a new greenlet with the given arguments
        (which are passed to the greenlet constructor) and adds it to the
        collection of greenlets this group is monitoring.

        We return a list of the newly started greenlets, used in a later
        'joinall` call.

        :return: A list of the newly started greenlets.
        """

        # here, workgroups is a list of Type[BaseGroup] objects
        workgroups = [val for val in self.workgroups.values()]
        spawn_list = [self.pool.spawn(self.monitor, worker) for work_group in
                      workgroups for worker in work_group]

        # we get a blocking error if we spawn the manager first, so spawn it last
        spawn_list.append(self.pool.spawn(self.manage))

        return spawn_list

    def monitor(self, target):
        """
        This method actually spawns the spider and then the purpose is to allow
        some additional final actions to be performed the worker completes the
        spider's job, but before it shuts down and the object instance is lost.

        The simplest example which must be implemented:

        def monitor(self, target):
            '''
            The only absolute requirement is to start the spider with
            target.spawn_spider() and then call gevent.sleep(0)
            '''
            target.spawn_spider()
            gevent.sleep(0)

        A more useful example:

        def monitor(self, target):
            '''
            More useful, would be to hook in some post-scrape logic between
            spawn_spider() and gevent.sleep(0).
            '''
            target.spawn_spider()
            # /start --> YOUR POST-SCRAPE HOOK IS HERE, ADD LOGIC AS REQUIRED.
            for event in target.events:
                # .event is a simple list() as a class attribute, in the scraper object
                # we could apply some transformation to an object in event, now.
                print(f'THIS IS A MONITOR EVENT - > {event}')
            # /end --> YOUR POST SCRAPE HOOK LOGIC. Finally, call gevent.sleep()
            gevent.sleep(0)

        :param target: a worker
        :return:
        """
        target.spawn_spider()
        gevent.sleep(0)

    def manage(self):
        """"
        Manage will hand out work when the appropriate Worker is free.
        The manager timeout must be less than worker timeout, or else, the
        workers will be idled and shutdown.
        """
        try:
            while True:
                for name, workgroup in self.workgroups.items():
                    for qname, q in self.qitems.items():
                        if name == qname: # workgroup name must match tracker name
                            # a tracker with the same name as workgroup name, is...
                            # ...effectively, the workgroup's task queue, so now...
                            # assign a task to a worker from the workgroup's task queue
                            for worker in workgroup:
                                one_task = q.get(timeout=self.mgr_qtimeout)
                                worker.tasks.put(one_task)
                gevent.sleep(0)
        except Empty:
            self.mgr_no_work = True
            if self.mgr_should_stop:
                logger.info(f"Assigned all {name} work. I've been told I should stop.")
                self.should_stop = True
            else:
                logger.info(f"Assigned all {name} work. Awaiting more tasks to assign.")

    def main(self):
        spawny = self.spawn_list()
        if self.kombu:
            gevent.spawn(self.run).join()
        try:
            gevent.pool.joinall(spawny)
        except LoopExit:
            logger.error('No tasks. This operation would block forever.')
        # print([worker.get() for worker in spawny])
        gevent.sleep(0)