fuzeman/byte

View on GitHub
byte/core/plugin/manager.py

Summary

Maintainability
F
3 days
Test Coverage
"""byte - plugin manager module."""

from __future__ import absolute_import, division, print_function

from byte import __path__ as byte_path
from byte.compilers.core.base import CompilerPlugin
from byte.core.plugin.base import Plugin
from byte.executors.core.base import ExecutorPlugin
from byte.formats.core.base import CollectionFormatPlugin, DocumentFormatPlugin

import imp
import inspect
import logging
import os
import pkgutil
import sys

log = logging.getLogger(__name__)


class PluginManager(object):
    """Plugin manager class."""

    kinds = [
        'compiler',
        'executor',
        'format'
    ]

    collections = {
        CompilerPlugin: [
            ('compilers_by_content_type',           'content_type'),    # noqa
            ('compilers_by_extension',              'extension')        # noqa
        ],
        ExecutorPlugin: [
            ('executors_by_content_type',           'content_type'),    # noqa
            ('executors_by_extension',              'extension'),       # noqa
            ('executors_by_scheme',                 'scheme')           # noqa
        ],
        CollectionFormatPlugin: [
            ('collection_formats_by_content_type',  'content_type'),    # noqa
            ('collection_formats_by_extension',     'extension')        # noqa
        ],
        DocumentFormatPlugin: [
            ('document_formats_by_content_type',    'content_type'),    # noqa
            ('document_formats_by_extension',       'extension')        # noqa
        ]
    }

    def __init__(self, modules=None):
        """Create plugin manager.

        :param modules: Default plugin modules
        :type modules: list of module or None
        """
        # Compilers
        self.compilers_by_content_type = {}
        self.compilers_by_extension = {}

        # Executors
        self.executors_by_content_type = {}
        self.executors_by_extension = {}
        self.executors_by_scheme = {}

        # Formats
        self.collection_formats_by_content_type = {}
        self.collection_formats_by_extension = {}

        self.document_formats_by_content_type = {}
        self.document_formats_by_extension = {}

        # Plugins
        self.plugins = {}
        self.plugins_by_kind = {}

        # Update plugin registry
        self.update(modules, reset=False)

    @classmethod
    def discover(cls, packages=('compilers', 'executors', 'formats')):
        """Discover plugin modules.

        :param packages: Package names
        :type packages: tuple of str
        """
        scanned_paths = set()

        for search_path in byte_path:
            search_path = os.path.normcase(os.path.normpath(
                os.path.realpath(search_path)
            ))

            if search_path in scanned_paths:
                continue

            scanned_paths.add(search_path)

            # Scan `search_path` for `packages`
            for package in packages:
                # Find package
                try:
                    _, package_path, _ = imp.find_module(package, [search_path])
                except ImportError:
                    continue

                # Discover plugins in package
                for mod in cls.discover_package(package, package_path):
                    yield mod

    @staticmethod
    def discover_package(package, package_path):
        """Discover plugin modules in package.

        :param package: Package name
        :type package: str

        :param package_path: Package path
        :type package_path: str
        """
        # Iterate over modules in package
        for _, name, _ in pkgutil.iter_modules([package_path]):
            # Find module
            try:
                fp, module_path, description = imp.find_module(name, [package_path])
            except ImportError as ex:
                log.warn('Unable to find module \'%s\' in %r: %s', name, package_path, ex)
                continue

            # Return existing module (if available)
            full_name = 'byte.%s.%s' % (package, name)

            if full_name in sys.modules:
                yield sys.modules[full_name]
                continue

            # Import module
            try:
                yield imp.load_module(full_name, fp, module_path, description)
            except ImportError as ex:
                log.warn('Unable to load module \'%s\' in %r: %s', name, module_path, ex)
                continue

    def get(self, kind, key):
        """Get plugin.

        Raises :code:`ValueError` on unknown plugin type or key.

        :param kind: Plugin type
        :type kind: str

        :param key: Plugin key
        :type key: str
        """
        if kind not in self.kinds:
            raise ValueError('Unknown plugin kind: %s' % (kind,))

        # Retrieve plugins matching `kind`
        plugins = self.plugins_by_kind.get(kind)

        if not plugins:
            raise ValueError('No \'%s\' plugins have been registered' % (kind,))

        # Return plugin matching `key` (if one exists)
        plugin = plugins.get(key)

        if not plugin:
            raise ValueError('No \'%s\' %s available' % (key, kind))

        return plugin

    def get_compiler(self, key):
        """Retrieve compiler by key.

        :param key: Key
        :type key: str
        """
        return self.plugins_by_kind.get('compiler', {})[key]

    def get_compiler_by_content_type(self, content_type):
        """Retrieve compiler by content type.

        :param content_type: Content type
        :type content_type: str
        """
        compilers = self.compilers_by_content_type.get(content_type)

        if not compilers:
            raise KeyError(content_type)

        _, compiler = compilers[0]
        return compiler

    def get_compiler_by_extension(self, extension):
        """Retrieve compiler by file extension.

        :param extension: File extension
        :type extension: str
        """
        compilers = self.compilers_by_extension.get(extension)

        if not compilers:
            raise KeyError(extension)

        _, compiler = compilers[0]
        return compiler

    def get_executor_by_scheme(self, scheme):
        """Retrieve executor by URI scheme.

        :param scheme: URI Scheme
        :type scheme: str
        """
        executors = self.executors_by_scheme.get(scheme)

        if not executors:
            raise KeyError(scheme)

        _, executor = executors[0]
        return executor

    def get_collection_format_by_content_type(self, content_type):
        """Retrieve collection format by content type.

        :param content_type: Content type
        :type content_type: str
        """
        formats = self.collection_formats_by_content_type.get(content_type)

        if not formats:
            raise KeyError(content_type)

        _, fmt = formats[0]
        return fmt

    def get_collection_format_by_extension(self, extension):
        """Retrieve collection format by file extension.

        :param extension: File extension
        :type extension: str
        """
        formats = self.collection_formats_by_extension.get(extension)

        if not formats:
            raise KeyError(extension)

        _, fmt = formats[0]
        return fmt

    def get_document_format_by_content_type(self, content_type):
        """Retrieve document format by content type.

        :param content_type: Content type
        :type content_type: str
        """
        formats = self.document_formats_by_content_type.get(content_type)

        if not formats:
            raise KeyError(content_type)

        _, fmt = formats[0]
        return fmt

    def get_document_format_by_extension(self, extension):
        """Retrieve document format by file extension.

        :param extension: File extension
        :type extension: str
        """
        formats = self.document_formats_by_extension.get(extension)

        if not formats:
            raise KeyError(extension)

        _, fmt = formats[0]
        return fmt

    def register(self, plugin):
        """Register plugin.

        :param plugin: Plugin
        """
        # Ensure plugin has a "key" defined
        if plugin.key is None:
            log.warn('Plugin \'%s\' in module \'%s\' has no "key" property defined', plugin.__name__, plugin.__module__)
            return False

        # Resolve plugin meta
        meta = getattr(plugin, 'Meta', None)

        if not meta:
            log.warn('Plugin \'%s\' has no meta defined', plugin.key)
            return False

        # Transform plugin meta values
        meta.transform()

        # Validate plugin meta
        try:
            meta.validate(plugin)
        except AssertionError as ex:
            log.warn('Plugin \'%s\' failed validation: %s', plugin.key, ex.message)
            return False

        # Ensure plugin `meta.kind` collection exists
        if meta.kind not in self.plugins_by_kind:
            self.plugins_by_kind[meta.kind] = {}

        # Ensure plugin hasn't already been registered
        if (meta.kind, plugin.key) in self.plugins or plugin.key in self.plugins_by_kind[meta.kind]:
            log.warn('Plugin with kind \'%s\' and key \'%s\' has already been registered', meta.kind, plugin.key)
            return False

        # Register plugin
        self.plugins[(meta.kind, plugin.key)] = plugin
        self.plugins_by_kind[meta.kind][plugin.key] = plugin

        # Register plugin in collections
        self.register_collections(plugin, meta)

        log.debug('Registered %s \'%s\'', meta.kind, plugin.key)
        return True

    def register_collections(self, plugin, meta):
        """Register plugin in collections.

        :param plugin: Plugin
        :param meta: Plugin metadata
        """
        for cls, collections in self.collections.items():
            if not issubclass(plugin, cls):
                continue

            for name, attribute in collections:
                # Retrieve collection dictionary
                collection = getattr(self, name)

                if collection is None:
                    raise ValueError('Unknown collection: %s' % (name,))

                # Register plugin in collection
                self.register_attribute(collection, attribute, plugin, meta)

    def register_attribute(self, collection, attribute, plugin, meta):
        """Register plugin in collection.

        :param collection: Plugin collection
        :type collection: dict

        :param attribute: Attribute name
        :type attribute: str

        :param plugin: Plugin
        :param meta: Plugin metadata
        """
        for value in (getattr(meta, attribute) or []):
            priority, value = self._resolve_definition(value)

            # Ensure `attribute` collection exists
            if value not in collection:
                collection[value] = []

            # Register plugin by `attribute`
            collection[value].append((plugin.priority + (priority / 10), plugin))
            collection[value].sort()

    def reset(self):
        """Reset plugin registry."""
        self.executors_by_scheme = {}

        self.plugins = {}
        self.plugins_by_kind = {}

    def update(self, modules=None, reset=True):
        """Update plugins registry.

        :param modules: Modules to scan for plugins
        :type modules: list of module

        :param reset: Reset registry
        :type reset: bool
        """
        # Reset state (if enabled)
        if reset:
            self.reset()

        # Find (and register) plugin classes in `modules`
        for mod in (modules or self.discover()):
            for key, value in mod.__dict__.items():
                if key.startswith('_') or not inspect.isclass(value):
                    continue

                if value.__module__ != mod.__name__ and value.__module__ != mod.__name__ + '.main':
                    continue

                if not issubclass(value, Plugin):
                    continue

                self.register(value)

    @staticmethod
    def _resolve_definition(value):
        if type(value) is tuple and len(value) == 2:
            return value

        return 1000, value

    def __contains__(self, key):
        return key in self.plugins

    def __len__(self):
        return len(self.plugins)