nil0x42/phpsploit

View on GitHub
src/core/plugins/__init__.py

Summary

Maintainability
A
3 hrs
Test Coverage
"""Phpsploit plugins handler
"""

import os
import re

import ui
import metadict
import core
from core import session
from datatypes import Path
from decorators.readonly_settings import readonly_settings
import utils.path

from .Plugin import Plugin
from .exceptions import BadPlugin

DEFAULT_PLUGIN = Plugin(core.BASEDIR +
                        "data/plugin-sample/category_name/plugin_example")


class Plugins(metadict.MetaDict):
    """Phpsploit plugins handler

    The Plugins() class represents the currently
    available plugins.
    """
    valid_plugin_name = re.compile("^[a-zA-Z0-9_-]+$")

    def __init__(self):
        """Initalize a plugins list instance"""
        self.errors = 0
        self.blacklist = []
        self.root_dirs = []
        self.root_dirs.append(Path(core.BASEDIR, "plugins", mode='drx'))
        self.root_dirs.append(Path(core.USERDIR, "plugins", mode='drx'))
        self.current_plugin = DEFAULT_PLUGIN
        super().__init__()

    @readonly_settings("VERBOSITY")
    def reload(self, verbose=None):
        """Reload the plugins list"""
        if verbose is not None:
            session.Conf.VERBOSITY = verbose
        self.clear()
        self.errors = 0
        categories = self._load_categories()
        num_loaded = self._load_plugins(categories)
        if self.errors:
            msg = "[#] %d errors encountered while loading plugins"
            if not verbose:
                msg += " (use `corectl reload-plugins` for + infos)"
            session.Conf.VERBOSITY = True
            print(msg % self.errors)
        if verbose or self.errors or ui.input.isatty():
            if num_loaded:
                print("[*] %d plugins correctly loaded" % num_loaded)
            else:
                print("[-] No plugins were loaded")
        return not self.errors

    def categories(self):
        """Get a list of existing plugin category names"""
        categories = []
        for plugin in self.values():
            categories.append(plugin.category)
        categories.sort()
        return list(set(categories))

    def run(self, argv):
        """Execute the plugin matching given argv list
        """
        # make current_plugin point to self plugin instance
        # this allows api module imports to get triggering
        # plugin attributes.
        plugin = self[argv[0]]
        self.current_plugin = plugin
        try:
            return plugin.run(argv)
        finally:
            self.current_plugin = DEFAULT_PLUGIN

    def _log_error(self, path, errmsg, _type="plugin"):
        print("[#] Couldn't load %s: «%s»" % (_type, path))
        print("[#]     " + errmsg)
        print("[#] ")
        self.errors += 1

    def _load_categories(self):
        """Load currently existing categories.

        The categories are returned in the form of a dictionnary.
        Each element key contains the category name, while value
        if a list of each child plugin absolute path.

        Example:
        >>> self.load_categories()
        {'system': ['/plugins/system/ls', '/plugins/system/pwd'],
         'sql': ['/plugins/sql/mysql', '/plugins/sql/mssql']}

        """
        category_dirs = []
        for root_dir in self.root_dirs:
            category_dirs += self._list_path_dirs(root_dir, _type="category")

        categories = {}
        for basename, abspath in category_dirs:
            if basename not in categories:
                categories[basename] = []
            categories[basename].append(abspath)
        return categories

    def _load_plugins(self, categories):
        """Fill current plugins instance with currently available plugins.

        All plugins are added to self items, with a key equal to the
        plugin name. Each plugin value is a Plugin() instance.

        Some plugins may fail to load, so numer of load errors is returned
        """
        num_loaded = 0
        for cat_paths in categories.values():
            for cat_path in cat_paths:
                cat_elems = self._list_path_dirs(cat_path, _type="plugin")
                for name, path in cat_elems:
                    if name in self.keys():
                        msg = "Name already taken by %r" % self[name].path
                        self._log_error(path, msg)
                    elif name in self.blacklist:
                        msg = "Name already taken by `%s` command" % name
                        self._log_error(path, msg)
                    try:
                        self[name] = Plugin(path)
                    except BadPlugin:
                        self.errors += 1
                    else:
                        num_loaded += 1
        return num_loaded

    def _list_path_dirs(self, root_dir, _type="plugin"):
        """Returns a list of tuples representing a plugin directory.

        Each tuple is in the form: (basename, abspath)

        Example:
        >>> self._list_path_dirs("/plugins/system")
        [("ls", "/plugins/system/ls"), ("pwd", "/plugins/system/pwd")]

        """
        elems = []
        for basename in os.listdir(root_dir):
            path = utils.path.truepath(root_dir, basename)
            if not os.path.isdir(path):
                self._log_error(path, "Not a directory", _type)
            elif not os.access(path, os.X_OK | os.R_OK):
                self._log_error(path, "Permission denied", _type)
            elif not self.valid_plugin_name.match(basename):
                msg = ("Folder name doesn't match " +
                       repr(self.valid_plugin_name.pattern))
                self._log_error(path, msg, _type)
            else:
                elems.append((basename, path))
        return elems


# instanciate plugins list
plugins = Plugins()