SpamExperts/OrangeAssassin

View on GitHub
oa/rules/ruleset.py

Summary

Maintainability
D
1 day
Test Coverage
"""A set of rules."""

from builtins import dict
from builtins import object
from builtins import str

import re
import socket
import email.utils
import collections
import email.message
import email.mime.text
import email.mime.base
import email.mime.multipart
from operator import itemgetter

import oa
import oa.errors
import oa.regex

_TAG_RE = oa.regex.Regex(r"(_([A-Z_]*?)_)")

_DNS_OPTIONS_RE = oa.regex.Regex(r"""
[
(?P<edns>(no)?edns0?=\d*)?,?
(?P<rotate>(no)?rotate)?,?
(?P<dns0x20>(no)?dns0x20)?,?
]
""", re.I | re.X | re.M)


class RuleSet(object):
    """A set of rules used to match against a message."""
    header_start = "X-Spam-"

    def __init__(self, ctxt):
        """Create a new empty RuleSet if paranoid is set to False any
        invalid rule is ignored.
        """
        self.ctxt = ctxt
        self.conf = ctxt.conf
        self.tags = set()
        # Store modification that need to be done to the message in
        # the following format:
        # (True/False, header_name, value)
        # Where the first argument is True if the header should be
        # removed instead of added.
        self.header_mod = {
            "spam": [],
            "ham": [],
            "all": [],
        }
        self.checked = collections.OrderedDict()
        self.not_checked = dict()
        # XXX Hardcoded at the moment, should be loaded from configuration.
        self.autolearn = False
        self.use_bayes = True
        self.use_network = True

    def _interpolate(self, text, msg):
        if msg.interpolate_data:
            return text % msg.interpolate_data

        spam = msg.score >= self.conf["required_score"]
        data = msg.interpolate_data
        # Initialize all tags with a empty value
        for tag in self.tags:
            data[tag] = "@@%s@@" % tag

        data["CONTACTADDRESS"] = self.conf["report_contact"]
        data["HOSTNAME"] = socket.gethostname()
        data["YESNOCAPS"] = "YES" if spam else "NO"
        data["YESNO"] = "Yes" if spam else "No"
        data["SCORE"] = "%0.1f" % msg.score
        data["REQD"] = "%0.1f" % self.conf["required_score"]
        data["SUBVERSION"] = oa.__release_date__
        data["VERSION"] = oa.__version__

        # Some of these tags are more expensive to create,
        # so only add them if they are required.
        if "REPORT" in self.tags:
            data["REPORT"] = self.get_matched_report(msg)
        if "TESTS" in self.tags:
            matched_rules = [name for name, result in msg.rules_checked.items()
                             if result]
            if not matched_rules:
                data["TESTS"] = "none"
            else:
                data["TESTS"] = ",".join(matched_rules)

        if "TESTSSCORES" in self.tags:
            matched_rules = ["%s=%s" % (name, int(result))
                             for name, result in msg.rules_checked.items()
                             if result]
            if not matched_rules:
                data["TESTSSCORES"] = "none"
            else:
                data["TESTSSCORES"] = ",".join(matched_rules)

        if "SUMMARY" in self.tags:
            data["SUMMARY"] = self.get_summary_report(msg)
        if "PREVIEW" in self.tags:
            preview = " ".join(msg.raw_text.split("\n", 3)[:3])[:200] + "[...]"
            data["PREVIEW"] = preview

        # Plugin can store custom tags in the the message
        # after they perform check. Add them to the data
        # as well.
        data.update(msg.plugin_tags)
        return text % msg.interpolate_data

    def add_rule(self, rule):
        """Add a rule to the ruleset, execute any pre and post processing
        that's defined for the rule.
        """
        rule.preprocess(self)
        if rule.should_check():
            self.checked[rule.name] = rule
        else:
            self.not_checked[rule.name] = rule
        rule.postprocess(self)

    def _convert_tags(self, text):
        """Replace _TAGS_ with placeholders. %(TAG)s"""
        text = text.strip("'\"")
        for tag in _TAG_RE.findall(text):
            self.tags.add(tag[1])
        return _TAG_RE.sub(r"%(\2)s", text)

    def get_report(self, msg):
        """Get the Spam report for this message

        :return: A string representing the report for this
          Spam message.

        """
        if not self.conf["report"]:
            return "\n(no report template found)\n"
        return self._interpolate(self.conf["report"], msg) + "\n"

    def get_unsafe_report(self, msg):
        if not self.conf["unsafe_report"]:
            return "\n(no report template found)\n"
        return self._interpolate(self.conf["unsafe_report"], msg) + "\n"

    def _add_header_rule(self, value, remove=False):
        """Add rule to add a header for the corresponding.

        The value must be in the following format:

         [all|spam|ham] [header_name] [header_value]

        If remove is set to True, then the header is removed
        instead of added.
        """
        self.ctxt.log.debug("Adding header rule: %s (%s)", value, remove)
        if not remove:
            msg_status, header_name, header_value = value.split(None, 2)
            header_value = self._convert_tags(header_value)
        else:
            msg_status, header_name = value.split(None, 1)
            header_value = None

        msg_status = msg_status.lower()
        if msg_status not in self.header_mod:
            raise oa.errors.InvalidRule("add_header", value)

        header_name = self.header_start + header_name

        self.header_mod[msg_status].append((remove, header_name, header_value))

    def get_adjusted_message(self, msg, header_only=False):
        """Get message adjusted by the rules."""
        spam = msg.score >= self.conf["required_score"]
        if not spam or header_only or self.conf["report_safe"] == 0:
            newmsg = email.message_from_string(msg.raw_msg)
        else:
            newmsg = self._get_bounce_message(msg)
        if self.conf["report_safe"] == 0:
            newmsg.add_header("X-Spam-Report",
                              self.get_matched_report(msg))
        self._adjust_headers(msg, newmsg, self.header_mod["all"])
        if spam:
            self._adjust_headers(msg, newmsg, self.header_mod["spam"])
        else:
            self._adjust_headers(msg, newmsg, self.header_mod["ham"])
        if header_only:
            return newmsg.as_string().split("\n\n", 1)[0] + "\n\n"
        return newmsg.as_string()

    def _adjust_headers(self, msg, newmsg, rules):
        """Adjust the headers of this message according to
        this list of rules. The rules are tuples in the following
        format:

        True/False, header_name, header_value

        If the first argument is True then remove the header
        instead of adding it.
        """
        for remove, name, value in rules:
            if remove:
                del newmsg[name]
            else:
                newmsg.add_header(name, self._interpolate(value, msg))

    def _get_bounce_message(self, msg):
        """Create a bounce message from the original."""
        newmsg = email.mime.multipart.MIMEMultipart("mixed")
        newmsg["Received"] = (
            "from localhost by %s with OrangeAssassin (version %s); %s" %
            (socket.gethostname(), oa.__version__,
             email.utils.formatdate(localtime=True))
        )
        # Switched around
        if "To" in msg.msg:
            newmsg["From"] = msg.msg['To']
        if "From" in msg.msg:
            newmsg["To"] = msg.msg['From']
        if "Subject" in msg.msg:
            newmsg["Subject"] = msg.msg["Subject"]
        msg_date = msg.msg["Date"] or email.utils.formatdate(localtime=True)
        newmsg["Date"] = msg_date
        newmsg.preamble = "This is a multi-part message in MIME format."
        newmsg.epilogue = ""

        attach_type = ("message", "rfc882")
        if self.conf["report_safe"] == 2:
            attach_type = ("text", "plain")

        report_message = self.get_report(msg) + self.get_unsafe_report(msg)

        newmsg.attach(email.mime.text.MIMEText(report_message))
        original_attachment = email.mime.base.MIMEBase(
                *attach_type, x_spam_type="original"
        )
        original_attachment.add_header("Content-Disposition", "inline")
        original_attachment.add_header("Content-Description",
                                       "original message before OrangeAssassin")
        original_attachment.set_payload(msg.raw_msg)
        newmsg.attach(original_attachment)
        return newmsg

    def get_matched_report(self, msg):
        """Get a report of rules that matched this message."""
        report = []
        for name, result in msg.rules_checked.items():
            if not result:
                continue
            rule = self.get_rule(name)
            report.append(
                "* %s %s %s%s" %
                (rule.score, rule.name, rule._rule_type, msg.rules_descriptions[name])
            )

        report = "\r\n".join(report)
        return "\r\n%s" % report

    def get_summary_report(self, msg):
        """Get summary report."""
        summary = []
        for name, result in msg.rules_checked.items():
            if not result:
                continue
            rule = self.get_rule(name)
            if rule.score == int(rule.score):
                score = str(int(rule.score)).rjust(4)
            else:
                score = ("%0.1f" % rule.score).rjust(4)
            summary.append(
                    "%s %s %s" %
                    (score, rule.name.ljust(22), rule.description)
            )
        return "\r\n".join(summary)

    def get_rule(self, name, checked_only=False):
        """Gets the rule with the given name. If checked_only is set to True
        then only returns the rule if it is going to be checked.

        Raises KeyError if no rule is found.
        """
        try:
            return self.checked[name]
        except KeyError:
            if checked_only:
                raise
        return self.not_checked[name]

    def post_parsing(self):
        """Run all post processing hooks."""
        self.checked = collections.OrderedDict(
            sorted(self.checked.items(), key=itemgetter(1), reverse=False))
        self.call_postparsing()
        # Convert some of the parsed information
        self.conf["report"] = "\n".join(
            self._convert_tags(value)
            for value in self.conf["report"]
        )
        self.conf["unsafe_report"] = "\n".join(
            self._convert_tags(value)
            for value in self.conf["unsafe_report"]
        )
        for value in self.conf["add_header"]:
            self._add_header_rule(value, False)
        for value in self.conf["remove_header"]:
            self._add_header_rule(value, True)

        for value in self.conf['dns_query_restriction']:
            try:
                option, qname = value.split(" ", 1)
            except ValueError:
                self.ctxt.log.info(
                    "Invalid value for dns_query_restriction: %s", value)

            if option not in ("allow", "deny"):
                self.ctxt.log.info(
                    "Invalid value for dns_query_restriction %s", value)
                continue
            self.ctxt.dns.query_restrictions[qname] = option == "deny"
        dns_options = {"edns": "edns=4096",
                       "rotate": "norotate",
                       "dns0x20": "nodns0x20"}
        dns_options_match = _DNS_OPTIONS_RE.match(self.conf['dns_options'])
        if dns_options_match:
            dns_options.update(dns_options_match.groupdict())
        self.ctxt.dns.rotate = dns_options['rotate']
        self.ctxt.dns.edns = dns_options['edns']

    def call_postparsing(self):
        """Call postparsing on ALL loaded rules."""
        for rule_list in (self.checked, self.not_checked):
            for name, rule in list(rule_list.items()):
                try:
                    rule.postparsing(self)
                except oa.errors.InvalidRule as e:
                    self.ctxt.err(e)
                    if self.ctxt.paranoid:
                        raise
                    del rule_list[name]

    def match(self, msg):
        """Match the message against all the rules in this ruleset."""
        try:
            for name, rule in self.checked.items():
                try:
                    result = rule.match(msg)
                except oa.errors.StopProcessing as e:
                    raise
                except Exception as e:
                    self.ctxt.log.critical("Unable to run rule %r: %s",
                                           name, e, exc_info=True)
                    result = False
                if isinstance(result, str):
                    msg.rules_descriptions[name] = result
                    result = True
                elif result:
                    msg.rules_descriptions[name] = rule.description
                self.ctxt.log.debug("Checked rule %s: %s", rule, result)
                msg.rules_checked[name] = result
                if result:
                    msg.score += rule.score
        except oa.errors.StopProcessing as e:
            self.ctxt.log.debug("Stop processing the messages as "
                                "requested: %s", e)
        self.ctxt.hook_check_end(self, msg)
        self.ctxt.hook_auto_learn(self, msg)