SpamExperts/OrangeAssassin

View on GitHub
oa/plugins/spf.py

Summary

Maintainability
D
2 days
Test Coverage
""" SPF Plugin."""

from __future__ import absolute_import

import re
from builtins import str

import spf

import oa.plugins.base
from oa.regex import Regex

RECEIVED_RE = Regex(r"""
    ^(pass|neutral|(?:soft)?fail|none|
    permerror|temperror)
    \b(?:.*\bidentity=(\S+?);?\b)?
""", re.I | re.S | re.X | re.M)
AUTHRES_SPF = Regex(r'.*;\s*spf\s*=\s*([^;]*)', re.I | re.S | re.X | re.M)
AUTHRES_RE = Regex(r"""
    ^(pass|neutral|(?:hard|soft)?fail|none|
    permerror|temperror)(?:[^;]*?
    \bsmtp\.(\S+)\s*=[^;]+)?
""", re.I | re.S | re.X | re.M)


class SpfPlugin(oa.plugins.base.BasePlugin):
    spf_check = False
    spf_check_helo = False
    no_valid_identity = False
    eval_rules = (
        "check_for_spf_pass",
        "check_for_spf_neutral",
        "check_for_spf_none",
        "check_for_spf_fail",
        "check_for_spf_softfail",
        "check_for_spf_permerror",
        "check_for_spf_temperror",
        "check_for_spf_helo_pass",
        "check_for_spf_helo_neutral",
        "check_for_spf_helo_none",
        "check_for_spf_helo_fail",
        "check_for_spf_helo_softfail",
        "check_for_spf_helo_permerror",
        "check_for_spf_helo_temperror",
        "check_for_spf_whitelist_from",
        "check_for_def_spf_whitelist_from"
    )
    options = {
        "whitelist_from_spf": ("append_split", []),
        "def_whitelist_from_spf": ("append_split", []),
        "spf_timeout": ("timevalue", 5),
        "do_not_use_mail_spf": ("bool", False),
        "do_not_use_mail_spf_query": ("bool", False),
        "ignore_received_spf_header": ("bool", False),
        "use_newest_received_spf_header": ("bool", False)
    }
    check_result = {
        "check_spf_pass": 0,
        "check_spf_neutral": 0,
        "check_spf_none": 0,
        "check_spf_fail": 0,
        "check_spf_softfail": 0,
        "check_spf_permerror": 0,
        "check_spf_temperror": 0,
        "check_spf_helo_pass": 0,
        "check_spf_helo_neutral": 0,
        "check_spf_helo_none": 0,
        "check_spf_helo_fail": 0,
        "check_spf_helo_softfail": 0,
        "check_spf_helo_permerror": 0,
        "check_spf_helo_temperror": 0,
        "check_spf_whitelist_from": 0,
        "check_def_spf_whitelist_from": 0
    }

    def parsed_metadata(self, msg):
        if self.get_global("ignore_received_spf_header"):
            # The plugin will ignore the spf headers and will perform
            # SPF check by itself by querying the dns
            if msg.get_decoded_header("received"):
                self.received_headers(msg, '')
                if msg.sender_address:
                    self.received_headers(msg, msg.sender_address)
        else:
            # # The plugin will try to use the SPF results found in any
            # # Received-SPF headers it finds in the message that could only
            # # have been added by an internal relay
            self.check_spf_header(msg)

    def check_for_spf_pass(self, msg, target=None):
        return self.check_result["check_spf_pass"] == 1

    def check_for_spf_neutral(self, msg, target=None):
        return self.check_result["check_spf_neutral"] == 1

    def check_for_spf_none(self, msg, target=None):
        return self.check_result["check_spf_none"] == 1

    def check_for_spf_fail(self, msg, target=None):
        return self.check_result["check_spf_fail"] == 1

    def check_for_spf_softfail(self, msg, target=None):
        return self.check_result["check_spf_softfail"] == 1

    def check_for_spf_permerror(self, msg, target=None):
        return self.check_result["check_spf_permerror"] == 1

    def check_for_spf_temperror(self, msg, target=None):
        return self.check_result["check_spf_temperror"] == 1

    def check_for_spf_helo_pass(self, msg, target=None):
        return self.check_result["check_spf_helo_pass"] == 1

    def check_for_spf_helo_neutral(self, msg, target=None):
        return self.check_result["check_spf_helo_neutral"] == 1

    def check_for_spf_helo_none(self, msg, target=None):
        return self.check_result["check_spf_helo_none"] == 1

    def check_for_spf_helo_fail(self, msg, target=None):
        return self.check_result["check_spf_helo_fail"] == 1

    def check_for_spf_helo_softfail(self, msg, target=None):
        return self.check_result["check_spf_helo_softfail"] == 1

    def check_for_spf_helo_permerror(self, msg, target=None):
        return self.check_result["check_spf_helo_permerror"] == 1

    def check_for_spf_helo_temperror(self, msg, target=None):
        return self.check_result["check_spf_helo_temperror"] == 1

    def check_for_spf_whitelist_from(self, msg, target=None):
        return self.check_spf_whitelist(msg, "whitelist_from_spf")

    def check_for_def_spf_whitelist_from(self, msg, target=None):
        return self.check_spf_whitelist(msg, "def_whitelist_from_spf")

    def check_spf_whitelist(self, msg, list_name):
        parsed_list = self.parse_list(list_name)
        if self[list_name]:
            if not self.check_for_spf_pass(msg):
                return False
        for regex in parsed_list:
            if re.match(regex, msg.sender_address):
                return True
        return False

    def parse_list(self, list_name):
        parsed_list = []
        characters = ["?", "@", ".", "*@"]
        for addr in self[list_name]:
            if len([e for e in characters if e in addr]):
                address = re.escape(addr).replace(r"\*", ".*").replace(r"\?",
                                                                       ".?")
                if "@" in address:
                    parsed_list.append(address)
                else:
                    parsed_list.append(".*@" + address)
        return parsed_list

    def check_spf_header(self, msg):
        authres_header = msg.msg["authentication-results"]
        received_spf_headers = msg.get_decoded_header("received-spf")
        if not self["use_newest_received_spf_header"]:
            received_spf_headers.reverse()
        if received_spf_headers:
            self.check_spf_received_header(received_spf_headers)
            if not self.no_valid_identity:
                self.received_headers(msg, '')
        elif authres_header:
            self.check_authres_header(authres_header)

        if msg.msg["received"]:
            if not received_spf_headers:
                self.received_headers(msg, '')
            if msg.sender_address:
                if self.spf_check_helo:
                    self.received_headers(msg, msg.sender_address)
                else:
                    self.received_headers(msg, '')

    def check_spf_received_header(self, received_spf_headers):
        for spf_header in received_spf_headers:
            match = RECEIVED_RE.match(spf_header)
            if not match:
                self.ctxt.log.debug("PLUGIN::SPF: invalid Received_SPF "
                                    "header")
                continue
            result = match.group(1)
            if match.group() == result:
                identity = ''
            elif match.group(2).lower() != 'none':
                identity = match.group(2)
            else:
                continue
            if identity:
                if identity.lower() in ('mfrom', 'mailfrom'):
                    if self.spf_check:
                        continue
                    identity = ''
                    self.spf_check = True
                elif identity == 'helo':
                    if self.spf_check_helo:
                        continue
                    self.spf_check_helo = True
                else:
                    continue
            elif self.spf_check:
                continue

            if not identity:
                self.no_valid_identity = True

            result.replace("error", "temperror")
            if identity:
                spf_identity = "check_spf_%s_%s" % (identity, result)
            else:
                spf_identity = "check_spf_%s" % result
                self.spf_check = True
            self.check_result[spf_identity] = 1
        if self.spf_check and self.spf_check_helo:
            return

    def check_authres_header(self, authres_header):
        self.ctxt.log.debug("PLUGIN::SPF: %s",
                            "found an Authentication-Results header "
                            "added by an internal host")

        extract_spf = AUTHRES_SPF.match(authres_header)
        match = None
        if extract_spf:
            match = AUTHRES_RE.match(extract_spf.group(1))
        if match:
            result = 'fail' if match.group(
                1) == 'hardfail' else match.group(1)
            identity = str(match.group(2))
            if identity in ('mfrom', 'mailfrom', 'None'):
                identity = ''
            elif identity == 'helo':
                identity = 'helo'
            if identity:
                spf_identity = "check_spf_%s_%s" % (identity, result)
            else:
                spf_identity = "check_spf_%s" % result
            self.check_result[spf_identity] = 1

    def received_headers(self, msg, sender):
        timeout = self.get_global("spf_timeout")
        if not msg.external_relays:
            return
        mx = msg.external_relays[0]['helo']
        ip = msg.external_relays[0]['ip']

        spf_result = self._query_spf(timeout, ip, mx, sender)
        if spf_result == "error":
            spf_result = "temperror"
        if self.spf_check_helo:
            spf_identity = "check_spf_%s" % spf_result
            self.check_result[spf_identity] = 1
        elif re.match(".*\..*", mx):
            spf_identity = "check_spf_helo_%s" % spf_result
            self.spf_check_helo = True
            self.check_result[spf_identity] = 1
        else:
            self.spf_check_helo = True

    def _query_spf(self, timeout, ip, mx, sender_address):
        self.ctxt.log.debug("SPF::Plugin %s",
                            "Querying the dns server(%s, %s, %s)..."
                            % (ip, mx, sender_address))
        result, comment = spf.check2(i=str(ip), s=sender_address,
                                     h=mx, timeout=timeout, querytime=timeout)
        return result