cloudpassage/don-bot

View on GitHub
app/donlib/config_helper.py

Summary

Maintainability
B
4 hrs
Test Coverage
import os
import re
from .utility import Utility
from halocelery.apputils import Utility as hc_util


class ConfigHelper(object):
    """This class contains all application configuration variables.

    All configuration variables in this class are derived from environment
    variables.

    Attributes:
        halo_api_key (str): Halo API key, sometimes referred to as 'key id'
        halo_api_secret_key (str): Halo API secret associated with halo_api_key
        halo_api_hostname (str): Hostname for Halo API
        halo_api_port (str): Halo API port
        slack_api_token (str): Slack API token
        slack_username (str): Donbot's user name (purely cosmetic)
        slack_icon (str): Donbot's avatar

    """
    def __init__(self):
        # Halo configs
        self.halo_api_key = os.getenv("HALO_API_KEY", "HARDSTOP")
        self.halo_api_secret_key = os.getenv("HALO_API_SECRET_KEY", "HARDSTOP")
        self.halo_api_host = os.getenv("HALO_API_HOSTNAME", "HARDSTOP")
        self.halo_api_port = os.getenv("HALO_API_PORT", "HARDSTOP")

        self.ua = ConfigHelper.get_ua_string()
        self.product_version = ConfigHelper.get_product_version()

        # Slack configs
        self.slack_api_token = os.getenv("SLACK_API_TOKEN", "HARDSTOP")
        self.slack_username = os.getenv("SLACK_USERNAME", "donbot")
        self.slack_icon_url = os.getenv("SLACK_ICON_URL", "")
        self.slack_channel = os.getenv("SLACK_CHANNEL", "halo")

        # Flower host for task status reporting
        self.flower_host = os.getenv("FLOWER_HOST")

        # IP Blocker configs
        self.ip_zone_name = os.getenv("IPBLOCKER_IP_ZONE_NAME", "")
        self.ipblocker_enable = Utility.bool_from_env("IPBLOCKER_ENABLED")
        self.ipblocker_trigger_events = Utility.list_from_env(
            "IPBLOCKER_TRIGGER_EVENTS")
        self.ipblocker_trigger_only_on_critical = Utility.bool_from_env(
            "IPBLOCKER_TRIGGER_ONLY_ON_CRITICAL")

        # Quarantine configs
        self.quarantine_enable = Utility.bool_from_env("QUARANTINE_ENABLED")
        self.quarantine_trigger_group_names = Utility.list_from_env(
            "QUARANTINE_TRIGGER_GROUP_NAMES")
        self.quarantine_trigger_events = Utility.list_from_env(
            "QUARANTINE_TRIGGER_EVENTS")
        self.quarantine_trigger_only_on_critical = Utility.bool_from_env(
            "QUARANTINE_TRIGGER_ONLY_ON_CRITICAL")
        self.quarantine_group_name = os.getenv("QUARANTINE_GROUP_NAME", "")

        # Event collector configs
        self.monitor_events = os.getenv("MONITOR_EVENTS", "no")

        self.suppress_events = os.getenv("SUPPRESS_EVENTS_IN_CHANNEL", "")
        self.max_threads = 5  # Max threads to be used by event collector
        self.halo_batch_size = 5  # Pagination depth for event collector
        # Check that Quarantine and IP Blocker configs are sane.
        if not self.quarantine_config_is_sane():
            self.quarantine_enable = False
        if not self.ipblocker_config_is_sane():
            self.ipblocker_enable = False

    @classmethod
    def get_ua_string(cls):
        """Return user agent string for Halo API interaction."""
        product = "HaloSlackbot"
        version = ConfigHelper.get_product_version()
        ua_string = product + "/" + version
        return ua_string

    @classmethod
    def get_product_version(cls):
        """Get version of donbot from __init__.py."""
        init = open(os.path.join(os.path.dirname(__file__),
                    "__init__.py")).read()
        rx_compiled = re.compile(r"\s*__version__\s*=\s*\"(\S+)\"")
        version = rx_compiled.search(init).group(1)
        return version

    def sane(self):
        """Test to make sure that config items for Halo and Slack are set.

        Returns:
            True if everything is OK, False if otherwise

        """
        sanity = True
        template = "Required configuration variable {0} is not set!"
        critical_vars = {"HALO_API_KEY": self.halo_api_key,
                         "HALO_API_SECRET": self.halo_api_secret_key,
                         "HALO_API_HOSTNAME": self.halo_api_host,
                         "HALO_API_PORT": self.halo_api_port,
                         "SLACK_API_TOKEN": self.slack_api_token}
        for name, varval in critical_vars.items():
            if varval == "HARDSTOP":
                sanity = False
                hc_util.log_stdout(template.format(name))
        return sanity

    def quarantine_config_is_sane(self):
        """Sanity check for quarantine configuration."""
        sanity = True
        # Check that trigger group names is a list
        if not isinstance(self.quarantine_trigger_group_names, list):
            hc_util.log_stderr("Quarantine trigger group names\"%s\" failed sanity check." %  # NOQA
                               self.quarantine_trigger_group_names)
            sanity = False
        # Check that trigger events is a list
        if not isinstance(self.quarantine_trigger_events, list):
            hc_util.log_stderr("Quarantine trigger events \"%s\" failed sanity check." %  # NOQA
                               self.quarantine_trigger_events)
            sanity = False
        # Check that quarantine group name is a string
        if not isinstance(self.quarantine_group_name, str):
            hc_util.log_stderr("Quarantine group name \"%s\" failed sanity check." %  # NOQA
                               self.quarantine_group_name)
            sanity = False
        # Check that lists and strings are not zero-length
        for x in [self.quarantine_trigger_events,
                  self.quarantine_trigger_group_names,
                  self.quarantine_group_name]:
            if len(x) == 0:
                hc_util.log_stderr("Quarantine config has empty field(s)")
                sanity = False
        return sanity

    def ipblocker_config_is_sane(self):
        """Sanity check for IP blocker configuration."""
        sanity = True
        # Check that trigger group names is a list
        if not isinstance(self.ip_zone_name, str):
            hc_util.log_stderr("IP Blocker IP zone name failed sanity check.")
            sanity = False
        # Check that trigger events is a list
        if not isinstance(self.ipblocker_trigger_events, list):
            hc_util.log_stderr("IP Blocker trigger events failed sanity check.")  # NOQA
            sanity = False
        # Check that list and string values are not zero-length
        for x in [self.ipblocker_trigger_events,
                  self.ip_zone_name]:
            if len(x) == 0:
                hc_util.log_stderr("IP Blocker config has empty field(s)")
                sanity = False
        return sanity