SpamExperts/OrangeAssassin

View on GitHub
oa/conf.py

Summary

Maintainability
C
1 day
Test Coverage
"""Manage configuration option parsing."""

import oa.errors

from datetime import timedelta


class Conf(object):
    """Parses and stores values for options in the global
    context. The options must be defined in the `options`
    class attribute in the following format::

        {
            "my_option": ("str", "default value"),
        }

    Support types are:

    * int
    * float
    * bool
    * str
    * list
    * append
    * append_split
    * clear

    See the corresponding `set_*_option` method for details
    on each of them.
    """
    options = None

    def __init__(self, context):
        """Initialize the plugin and parses all options specified in
        options.
        """
        self.ctxt = context
        self._plugin_name = self.__class__.__name__
        if self.options:
            for key, (dummy, value) in self.options.items():
                self.set_global(key, value)

    def __setitem__(self, key, value):
        self.set_global(key, value)

    def __getitem__(self, item):
        return self.get_global(item)

    def __delitem__(self, key):
        self.del_global(key)

    def set_global(self, key, value):
        """Store data in the global context"""
        self.ctxt.set_plugin_data(self._plugin_name, key, value)

    def get_global(self, key=None):
        """Get data from the global context"""
        return self.ctxt.get_plugin_data(self._plugin_name, key)

    def del_global(self, key=None):
        """Delete data from the global context"""
        return self.ctxt.del_plugin_data(self._plugin_name, key)

    def set_local(self, msg, key, value):
        """Store data in the local message context"""
        msg.set_plugin_data(self._plugin_name, key, value)

    def get_local(self, msg, key=None):
        """Get data from the local message context"""
        return msg.get_plugin_data(self._plugin_name, key)

    def del_local(self, msg, key=None):
        """Delete data from the local message context"""
        return msg.del_plugin_data(self._plugin_name, key)

    def set_int_option(self, global_key, value):
        """Parse and set a integer option."""
        try:
            if value:
                self.set_global(global_key, int(value))
        except ValueError:
            raise oa.errors.PluginError("Invalid value for %s: %s" %
                                        (global_key, value))

    def set_float_option(self, global_key, value):
        """Parse and set a float option."""
        try:
            if value:
                self.set_global(global_key, float(value))
        except ValueError:
            raise oa.errors.PluginError("Invalid value for %s: %s" %
                                        (global_key, value))

    def set_timevalue_option(self, global_key, value):
        """Parse and set a timevalue option."""
        unit = ""
        try:
            if value:
                value = float(value)
        except ValueError:
            unit = value[-1:]
            value = value[:-1]
            try:
                if value:
                    value = float(value)
            except ValueError:
                raise oa.errors.PluginError("Invalid value for %s: %s" %
                                            (global_key, value))
        if not isinstance(value, float) or value < 0:
            return
            # raise oa.errors.PluginError("Negative value for %s: %s" %
            #                              (global_key, value))
        if unit:
            if unit == "m":
                time_unit = timedelta(minutes=value)
            elif unit == "h":
                time_unit = timedelta(hours=value)
            elif unit == "d":
                time_unit = timedelta(days=value)
            elif unit == "w":
                time_unit = timedelta(weeks=value)
            elif unit == "s":
                time_unit = timedelta(seconds=value)
            else:
                return

            value = time_unit.total_seconds()

        self.set_global(global_key, value)

    def set_bool_option(self, global_key, value):
        """Parse and set a bool option."""
        self.set_global(global_key, value.lower() in ("1", "true"))

    def set_str_option(self, global_key, value):
        """Parse and set a string option."""
        self.set_global(global_key, value)

    def set_list_option(self, global_key, value, separator=","):
        """Parse and set a list option."""
        self.set_global(global_key, value.split(separator))

    def set_append_option(self, key, value):
        """This option can be specified multiple times and the
        result are appended to the list.
        """
        self.get_global(key).append(value)

    def set_append_split_option(self, key, value, separator=None):
        """This option can be specified multiple times and the
        result are appended to the list.

        The values themselves can also be list of separated by
        whitespace.
        """
        # WHY?! :/ Who would think this is a good idea?
        self.get_global(key).extend(value.split(separator))

    def set_clear_option(self, key, value):
        """Clear the current option's value and replace it
        with the default.
        """
        real_keys = self.options[key][1]
        for real_key in real_keys:
            self.set_global(real_key, [])

    def inhibit_further_callbacks(self):
        """Tells the plugin handler to inhibit calling into other plugins in
        the plugin chain for the current callback.
        """
        raise oa.errors.InhibitCallbacks()

    def parse_config(self, key, value):
        """Parse a config line that the normal parses doesn't know how to
        interpret.

        Use self.inhibit_further_callbacks to stop other plugins from
        processing this line.

        May be overridden.
        """
        if self.options and key in self.options:
            set_func = getattr(self, "set_%s_option" % self.options[key][0])
            set_func(key, value)
            self.inhibit_further_callbacks()


class PADConf(Conf):
    """Main configuration of OrangeAssassin"""

    options = {
        "report": ("append", []),
        "clear_report_template": ("clear", ["report"]),
        "unsafe_report": ("append", []),
        "clear_unsafe_report_template": ("clear", ["unsafe_report"]),
        "report_contact": ("str", ""),
        "report_safe": ("int", 1),
        "add_header": ("append", []),
        "remove_header": ("append", []),
        "clear_headers": ("clear", ["add_header", "remove_header"]),
        "required_score": ("float", 5.0),
        "use_bayes": ("bool", True),
        "use_network": ("bool", True),
        "envelope_sender_header": ("append", []),
        "dns_server": ("append", []),
        "clear_dns_servers": ("clear", ["dns_server"]),
        "default_dns_lifetime": ("float", 10.0),
        "default_dns_timeout": ("float", 2.0),
        "allow_user_rules": ("bool", False),
        "skip_rbl_checks": ("bool", 0),
        "trusted_networks": ("append_split", []),
        "clear_trusted_networks": ("clear", ["trusted_networks"]),
        "internal_networks": ("append_split", []),
        "clear_internal_networks": ("clear", ["internal_networks"]),
        "msa_networks": ("append_split", []),
        "clear_msa_networks": ("clear", ["msa_networks"]),
        "originating_ip_headers": ("append_split", []),
        "clear_originating_ip_headers": ("clear", ["originating_ip_headers"]),
        "clear_dns_query_restriction": ("clear", ["dns_query_restriction"]),
        "always_trust_envelope_sender": ("int", 0),
        "dns_available": ("str", "yes"),
        "dns_local_ports_permit": ("append_split", []),
        "dns_local_ports_avoid": ("append_split", []),
        "dns_test_interval": ("str", "600"),
        "dns_options": ("str", "norotate, nodns0x20, edns=4096"),
        "dns_query_restriction": ("append", []),
        "autolearn": ("bool", False),
        "training": ("bool", False),
        "user_config": ("bool", True),
        "ok_locales": ("str", ""),
    }