cloudpassage/don-bot

View on GitHub
app/cortexlib/quarantine.py

Summary

Maintainability
A
1 hr
Test Coverage
"""Quarantine functionality is managed here."""
import cloudpassage
from halocelery.apputils import Utility as hc_util


class Quarantine(object):
    """Instantiate with a `config` object."""

    def __init__(self, config):
        self.quarantine_enable = config.quarantine_enable
        self.quarantine_group_name = config.quarantine_group_name
        self.quarantine_trigger_group_names = config.quarantine_trigger_group_names  # NOQA
        self.quarantine_trigger_events = config.quarantine_trigger_events
        self.quarantine_trigger_only_on_critical = config.quarantine_trigger_only_on_critical  # NOQA
        self.session = cloudpassage.HaloSession(config.halo_api_key,
                                                config.halo_api_secret_key,
                                                api_host=config.halo_api_host,
                                                api_port=config.halo_api_port,
                                                integration_string=config.ua)

    def should_quarantine(self, event):
        """Return an event object, or False if quarantine should not happen."""
        # If quarantine is not enabled, bail now.
        if self.quarantine_enable is False:
            return False
        event["quarantine_group"] = self.quarantine_group_name
        # If criteria are met and configuration is sane, trigger quarantine.
        if (self.group_name_match(event) is True and
                self.event_type_match(event) is True and
                self.criticality_match(event) is True and
                self.config_is_unambiguous() is True):
            return event
        # Don't trigger by default.
        return False

    def get_all_server_groups(self):
        """Get a list of groups from the Halo API."""
        try:
            server_groups = cloudpassage.ServerGroup(self.session)
            all_groups = server_groups.list_all()
        except cloudpassage.CloudPassageAuthentication:
            hc_util.log_stderr("Quarantine: Authentication failure with CloudPassage API!")  # NOQA
            all_groups = []
        return all_groups

    def config_is_unambiguous(self):
        """Ensure that configuration is unambiguous.

        Every goup named in quarantine config must have exactly one
        match in the Halo account.  If more than one exists, bail because
        of ambiguous configuration.
        """
        retval = True
        reason = ""
        all_groups = self.get_all_server_groups()
        if all_groups == []:
            hc_util.log_stderr("Quarantine: Unable to get server groups from Halo! Check your API credentials!")  # NOQA
            return False
        all_configured_groups = []
        all_configured_groups.append(self.quarantine_group_name)
        all_configured_groups.extend(self.quarantine_trigger_group_names[:])
        # Ensure that no more than one group exists per source group name.
        for group in all_configured_groups:
            groups = [x for x in all_groups if x["name"] == group]
            if len(groups) == 0:
                reason += "Quarantine: There is no group named %s\n" % group
                retval = False
            elif len(groups) > 1:
                reason += "Quarantine: More than one group named %s\n" % group
                retval = False
        if retval is False:
            hc_util.log_stderr("Quarantine: Group configuration is ambiguous:\n%s" % reason)  # NOQA
        return retval

    def criticality_match(self, event):
        """Return True if the event's criticality meets requirements."""
        if self.quarantine_trigger_only_on_critical is False:
            return True
        elif (self.quarantine_trigger_only_on_critical is True and
              event["critical"] is True):
            return True
        else:
            return False

    def event_type_match(self, event):
        """Return boolean for event type match."""
        if event["type"] in self.quarantine_trigger_events:
            return True
        else:
            return False

    def group_name_match(self, event):
        """Return bool for server group name match."""
        # If this is not a workload event, return False
        if "server_group_name" not in event:
            return False
        if event["server_group_name"] in self.quarantine_trigger_group_names:
            return True
        else:
            return False