SpamExperts/OrangeAssassin

View on GitHub
oa/plugins/header_eval.py

Summary

Maintainability
F
1 wk
Test Coverage
"""Expose some eval rules that do checks on the headers."""

from __future__ import absolute_import
from __future__ import division

import re
import time
import datetime
import itertools
import email.utils
import email.header

from dateutil import relativedelta

import oa.locales
import oa.plugins.base
from oa.received_parser import IP_ADDRESS
from oa.regex import Regex


class HeaderEval(oa.plugins.base.BasePlugin):
    hotmail_addr_with_forged_hotmail_received = 0
    hotmail_addr_but_no_hotmail_received = 0
    tocc_sorted_count = 7
    tocc_similar_count = 5
    tocc_similar_length = 2

    eval_rules = (
        "check_for_fake_aol_relay_in_rcvd",
        "check_for_faraway_charset_in_headers",
        "check_for_unique_subject_id",
        "check_illegal_chars",
        "check_for_forged_hotmail_received_headers",
        "check_for_no_hotmail_received_headers",
        "check_for_msn_groups_headers",
        "check_for_forged_eudoramail_received_headers",
        "check_for_forged_yahoo_received_headers",
        "check_for_forged_juno_received_headers",
        "check_for_matching_env_and_hdr_from",
        "sorted_recipients",
        "similar_recipients",
        "check_for_missing_to_header",
        "check_for_forged_gw05_received_headers",
        "check_for_shifted_date",
        "subject_is_all_caps",
        "check_for_to_in_subject",
        "check_outlook_message_id",
        "check_messageid_not_usable",
        "check_header_count_range",
        "check_unresolved_template",
        "check_ratware_name_id",
        "check_ratware_envelope_from",
        "gated_through_received_hdr_remover",
        "check_equal_from_domains",
        "received_within_months"
    )

    options = {
        "util_rb_tld": ("append_split", []),
        "util_rb_2tld": ("append_split", []),
        "util_rb_3tld": ("append_split", []),
    }

    def check_for_fake_aol_relay_in_rcvd(self, msg, target=None):
        """Check for common AOL fake received header."""
        for recv in msg.get_decoded_header("Received"):
            if not Regex(r" rly-[a-z][a-z]\d\d\.", re.I).search(recv):
                continue
            if Regex(r"\/AOL-\d+\.\d+\.\d+\)").search(recv):
                continue
            if Regex(r"ESMTP id (?:RELAY|MAILRELAY|MAILIN)").search(recv):
                continue
            return True
        return False

    def check_for_faraway_charset_in_headers(self, msg, target=None):
        """Check if the Subject/From header is in a NOT ok locale.

        This eval rule requires the ok_locales setting configured,
        and not set to ALL.
        """
        ok_locales = self.ctxt.conf.get_global("ok_locales")
        if not ok_locales or ok_locales.lower() == "all":
            return False
        ok_locales = ok_locales.split()

        # XXX We should really be checking ALL headers here,
        # XXX not just Subject and From.
        for header_name in ("Subject", "From"):
            for header in msg.get_raw_header(header_name):
                try:
                    decoded_header = email.header.decode_header(header)
                except (ValueError, email.header.HeaderParseError):
                    continue

                for value, charset in decoded_header:
                    if not oa.locales.charset_ok_for_locales(
                            charset, ok_locales):
                        return True

        return False

    def check_for_unique_subject_id(self, msg, target=None):
        """Check if in subject appears an unique id"""
        subject = "".join(msg.get_decoded_header("Subject"))
        id = None
        unique_id_re_list = [
            r"[-_\.\s]{7,}([-a-z0-9]{4,})$",
            r"\s{10,}(?:\S\s)?(\S+)$",
            r"\s{3,}[-:\#\(\[]+([-a-z0-9]{4,})[\]\)]+$",
            r"\s{3,}[-:\#]([a-z0-9]{5,})$",
            r"[\s._]{3,}([^0\s._]\d{3,})$",
            r"[\s._]{3,}\[(\S+)\]$",

            # (7217vPhZ0-478TLdy5829qicU9-0@26) and similar
            r"\(([-\w]{7,}\@\d+)\)$",
            r"\b(\d{7,})\s*$",

            # stuff at end of line after "!" or "?" is usually an id
            r"[!\?]\s*(\d{4,}|\w+(-\w+)+)\s*$",

            # 9095IPZK7-095wsvp8715rJgY8-286-28 and similar
            # excluding 'Re:', etc and the first word
            r"(?:\w{2,3}:\s)?\w+\s+(\w{7,}-\w{7,}(-\w+)*)\s*$",

            # #30D7 and similar
            r"\s#\s*([a-f0-9]{4,})\s*$"
        ]
        for rgx in unique_id_re_list:
            match = Regex(rgx, re.I).search(subject)
            if match:
                id = match.group()
                break
        if not id:
            return False
        comercial_re = Regex(r"(?:item|invoice|order|number|confirmation)"
                             r".{1,6}%s\s*$" % id, re.X | re.I)
        if Regex(r"\d{5,}").search(id) and comercial_re.search(subject):
            return False
        return True

    def check_illegal_chars(self, msg, header, ratio, count, target=None):
        """look for 8-bit and other illegal characters that should be MIME
        encoded, these might want to exempt languages that do not use
        Latin-based alphabets, but only if the user wants it that way
        """
        try:
            ratio = float(ratio)
        except ValueError:
            self.ctxt.log.warn("HeaderEval::Plugin check_illegal_chars "
                                  "invalid option: %s", ratio)
            return False
        try:
            count = int(count)
        except ValueError:
            self.ctxt.log.warn("HeaderEval::Plugin check_illegal_chars "
                                  "invalid option: %s", count)
            return False
        if header == 'ALL':
            raw_headers = msg.raw_headers
            key_headers = []
            for keys in raw_headers.keys():
                key_headers.append(keys)
            for key in key_headers:
                if key.lower() in ("subject", "from"):
                    try:
                        del raw_headers[key]
                    except KeyError:
                        pass
        else:
            raw_headers = {header: msg.get_raw_header(header)}

        # count illegal substrings (RFC 2045)
        # (non-ASCII + C0 controls except TAB, NL, CR)

        raw_str = ''.join([''.join(value) for value in raw_headers.values()])
        try:
            raw_str = raw_str.decode("utf-8")
        except AttributeError:
            # in Python 3 all string is unicode object
            pass
        clean_hdr = ''.join([i if ord(i) < 128 else '' for i in raw_str])
        illegal = len(raw_str) - len(clean_hdr)
        if illegal > 0 and header.lower() == "subject":
            exempt = 0
            # only exempt a single cent sign, pound sign, or registered sign
            for except_chr in (u'\xa2', u'\xa3', u'\xae'):
                if except_chr in raw_str:
                    exempt += 1
            if exempt == 1:
                illegal -= exempt
        if raw_str:
            return (illegal / len(raw_str)) >= ratio and illegal >= count
        else:
            return False

    def check_for_forged_hotmail_received_headers(self, msg, target=None):
        """Check for forged hotmail received headers"""
        self._check_for_forged_hotmail_received_headers(msg)
        return self.hotmail_addr_with_forged_hotmail_received

    def check_for_no_hotmail_received_headers(self, msg, target=None):
        """Check for no hotmail received headers"""
        self._check_for_forged_hotmail_received_headers(msg)
        return self.hotmail_addr_but_no_hotmail_received

    def _check_for_forged_hotmail_received_headers(self, msg):
        self.hotmail_addr_but_no_hotmail_received = 0
        self.hotmail_addr_with_forged_hotmail_received = 0
        rcvd = msg.msg.get("Received")
        if not rcvd:
            return False
        pickup_service_regex = Regex(r"from mail pickup service by hotmail"
                                     r"\.com with Microsoft SMTPSVC;")
        if pickup_service_regex.search(rcvd):
            return False
        if self.check_for_msn_groups_headers(msg):
            return False
        ip_header = msg.msg.get("X-ORIGINATING-IP")
        if ip_header and IP_ADDRESS.search(ip_header):
            FORGED_REGEX = Regex(
                r"from\s+(?:\S*\.)?hotmail.com\s+\(\S+\.hotmail("
                r"?:\.msn)?\.com[\)]|"
                r"from\s+\S*\.hotmail\.com\s+\(\[{IP_ADDRESS}\]|"
                r"from\s+\S+\s+by\s+\S+\.hotmail(?:\.msn)?\.com\s+with\s+ "
                r"HTTP\;|"
                r"from\s+\[66\.218.\S+\]\s+by\s+\S+\.yahoo\.com"
                r"".format(IP_ADDRESS=IP_ADDRESS.pattern), re.I | re.X)
            if FORGED_REGEX.search(rcvd):
                return False
        if self.gated_through_received_hdr_remover(msg):
            return False

        helo_hotmail_regex = Regex(r"(?:from |HELO |helo=)\S*hotmail\.com\b")
        if helo_hotmail_regex.search(rcvd):
            self.hotmail_addr_with_forged_hotmail_received = 1
        else:
            from_address = msg.msg.get("From")
            if not from_address:
                from_address = ""
            if "hotmail.com" not in from_address:
                return False
            self.hotmail_addr_but_no_hotmail_received = 1

    def check_for_msn_groups_headers(self, msg, target=None):
        """Check if the email's destination is a msn group"""
        to = ''.join(msg.get_decoded_header('To'))
        if not Regex(r"<(\S+)\@groups\.msn\.com>").search(to):
            return False
        listname = Regex(r"<(\S+)\@groups\.msn\.com>").match(to).groups()[0]
        server_rgx = Regex(r"from mail pickup service by "
                           r"((?:p\d\d\.)groups\.msn\.com)\b")
        server = ''
        for rcvd in msg.get_decoded_header('Received'):
            if server_rgx.search(rcvd):
                server = server_rgx.search(rcvd).groups()[0]
                break
        if not server:
            return False
        message_id = ''.join(msg.get_decoded_header('Message-Id'))
        if listname == "notifications":
            if not Regex(r"^<\S+\@{0}".format(server)).search(message_id):
                return False
        else:
            msn_addr = Regex(r"^<{0}-\S+\@groups\.msn\.com>".format(listname))
            if not msn_addr.search(message_id):
                return False
            msn_addr = "{0}-bounce@groups.msn.com".format(listname)
            if msg.sender_address != msn_addr:
                return False
        return True

    def check_for_forged_eudoramail_received_headers(self, msg, target=None):
        """Check if the email has forged eudoramail received header"""
        from_addr = ''.join(msg.get_all_addr_header("From"))
        if from_addr.rsplit("@", 1)[-1] != "eudoramail.com":
            return False
        rcvd = ''.join(msg.get_decoded_header("Received"))
        ip = ''.join(msg.get_decoded_header("X-Sender-Ip"))
        if ip and IP_ADDRESS.search(ip):
            ip = True
        else:
            ip = False
        if self.gated_through_received_hdr_remover(msg):
            return False
        if Regex(r"by \S*whowhere.com\;").search(rcvd) and ip:
            return False
        return True

    def check_for_forged_yahoo_received_headers(self, msg, target=None):
        """Check for forged yahoo received headers"""
        from_addr = ''.join(msg.get_all_addr_header("From"))
        rcvd = ''.join(msg.get_decoded_header("Received"))
        if "yahoo.com" not in from_addr:
            return False
        if (msg.get_decoded_header("Resent-From") and
                msg.get_decoded_header("Resent-To")):
            xrcvd = ''.join(msg.get_decoded_header("X-Received"))
            rcvd = xrcvd if xrcvd else rcvd

        if self.gated_through_received_hdr_remover(msg):
            return False
        for relay in msg.untrusted_relays + msg.trusted_relays:
            rdns = relay.get("rdns")
            if rdns and "yahoo.com" in rdns:
                return False
        if Regex(r"by web\S+\.mail\S*\.yahoo\.com via HTTP").search(rcvd):
            return False
        if Regex(r"by smtp\S+\.yahoo\.com with SMTP").search(rcvd):
            return False
        yahoo_ip_re = Regex(
            r"from\s+\[{}\]\s+by\s+\S+\."
            r"(?:groups|scd|dcn)\.yahoo\.com\s+with\s+NNFMP".format(
                IP_ADDRESS.pattern), re.X)
        if yahoo_ip_re.search(rcvd):
            return False
        if (Regex(r"\bmailer\d+\.bulk\.scd\.yahoo\.com\b").search(rcvd) and
                    from_addr.rsplit("@", 1)[-1] == "reply.yahoo.com"):
            return False
        if Regex("by \w+\.\w+\.yahoo\.com \(\d+\.\d+\.\d+\/\d+\.\d+\.\d+\)"
                 "(?: with ESMTP)? id \w+").search(rcvd):
            return False
        return True

    def check_for_forged_juno_received_headers(self, msg, target=None):
        from_addr = ''.join(msg.get_all_addr_header("From"))
        if not from_addr.rsplit("@", 1)[-1].endswith("juno.com"):
            return False
        if self.gated_through_received_hdr_remover(msg):
            return False
        xorig = ''.join(msg.get_decoded_header("X-Originating-IP"))
        xmailer = ''.join(msg.get_decoded_header("X-Mailer"))
        rcvd = ''.join(msg.get_decoded_header("Received"))
        if xorig != "":
            juno_re = Regex(r"from.*\b(?:juno|untd)\.com.*"
                            r"[\[\(]{0}[\]\)].*by".format(IP_ADDRESS.pattern), re.X)
            cookie_re = Regex(r" cookie\.(?:juno|untd)\.com ")
            if not juno_re.search(rcvd) and not cookie_re.search(rcvd):
                return True
            if "Juno " not in xmailer:
                return True
        else:
            mail_com_re = Regex(r"from.*\bmail\.com.*\[{}\].*by".format(
                IP_ADDRESS.pattern), re.X)
            untd_com_re = Regex(r"from\s+(webmail\S+\.untd"
                                r"\.com)\s+\(\1\s+\[{}\]\)\s+by".format(
                IP_ADDRESS.pattern), re.X)
            if mail_com_re.search(rcvd) and not Regex(r"\bmail\.com").search(
                    xmailer):
                return True
            elif untd_com_re.search(rcvd) and not Regex(
                    r"^Webmail Version \d").search(xmailer):
                return True
            else:
                return True
        return False

    def check_for_matching_env_and_hdr_from(self, msg, target=None):
        from_addr = ''.join(msg.get_all_addr_header("From"))
        envfrom = ""
        for relay in msg.trusted_relays + msg.untrusted_relays:
            if relay.get('envfrom'):
                envfrom = relay.get('envfrom')
                break
        return envfrom in from_addr

    def _parse_rcpt(self, addr):
        user = addr[:self.tocc_similar_count]
        try:
            fqhn = addr.rsplit("@", 1)[1]
        except IndexError:
            fqhn = addr
        host = fqhn[:self.tocc_similar_length]
        return user, fqhn, host

    def _check_recipients(self, msg):
        """Check for similar recipients addresses.

        Return the ratio of possibly similar recipient of
        the total number of possible combinations.
        """
        try:
            return self.get_local(msg, "tocc_similar")
        except KeyError:
            pass

        recipients = []
        for header_name in ("To", "Cc", "Bcc", "ToCc"):
            recipients.extend(msg.get_all_addr_header(header_name))

        if not recipients:
            self.set_local(msg, "tocc_similar", 0)
            self.set_local(msg, "tocc_sorted", False)
            return

        sorted_recipients = sorted(recipients)
        self.set_local(msg, "tocc_sorted", sorted_recipients == recipients)
        # Remove dupes IF they are next to each other
        addresses = [recipients[0]]
        for rcpt1, rcpt2 in zip(recipients, recipients[1:]):
            if rcpt1 == rcpt2:
                continue
            addresses.append(rcpt2)

        if len(addresses) < self.tocc_similar_count:
            self.set_local(msg, "tocc_similar", 0)
            return

        hits = 0
        combinations = 0
        for rcpt1, rcpt2 in itertools.combinations(addresses, 2):
            user1, fqhn1, host1 = self._parse_rcpt(rcpt1)
            user2, fqhn2, host2 = self._parse_rcpt(rcpt2)
            combinations += 1
            if user1 == user2:
                hits += 1
            if host1 == host2 and fqhn1 != fqhn2:
                hits += 1
        ratio = hits / combinations
        self.set_local(msg, "tocc_similar", ratio)
        return ratio

    def sorted_recipients(self, msg, target=None):
        """Matches if the recipients are sorted"""
        self._check_recipients(msg)
        return self.get_local(msg, "tocc_sorted")

    def similar_recipients(self, msg, minr=None, maxr=None,
                           target=None):
        """Matches if the similar recipients ratio is in the
        specified .

        :param minr: The minimum for the ratio
        :param maxr: The maximum for the ratio
        """
        ratio = self._check_recipients(msg)
        try:
            return (
                (minr is None or float(minr) < ratio) and
                (maxr is None or ratio < float(maxr))
            )
        except (TypeError, ValueError):
            return False

    def check_for_missing_to_header(self, msg, target=None):
        """Check if the To header is missing."""
        if msg.get_raw_header("To"):
            return False
        if msg.get_raw_header("Apparently-To"):
            return False
        return True

    def check_for_forged_gw05_received_headers(self, msg, target=None):
        gw05_re = Regex(r"from\s(\S+)\sby\s(\S+)\swith\sESMTP\;\s+\S\S\S,"
                        r"\s+\d+\s+\S\S\S\s+\d{4}\s+\d\d:\d\d:\d\d\s+[-+]*"
                        r"\d{4}", re.X | re.I)
        for rcv in msg.get_decoded_header("Received"):
            h1 = ""
            h2 = ""
            try:
                match = gw05_re.match(rcv)
                if match:
                    h1, h2 = match.groups()
                if h1 and h2 and h2 != ".":
                    return True
            except IndexError:
                continue
        return False

    def _parse_rfc822_date(self, date):
        try:
            parsed_date = email.utils.parsedate(date)
            time_in_seconds = time.mktime(parsed_date)
            date_time = datetime.datetime.utcfromtimestamp(
                time_in_seconds)
        except ValueError:
            return
        return date_time

    def _get_date_header_time(self, msg):
        try:
            return self.get_local(msg, "date_header_time")
        except KeyError:
            pass

        date_time = None
        for header in ["Resent-Date", "Date"]:
            dates = msg.get_decoded_header(header)
            for date in dates:
                date_time = self._parse_rfc822_date(date)
                if date_time:
                    break
        if date_time:
            self.set_local(msg, "date_header_time", date_time)
        else:
            self.set_local(msg, "date_header_time", -1)
        return date_time

    def _get_received_header_times(self, msg):
        try:
            return self.get_local(msg, "received_header_times")
        except KeyError:
            pass

        received_times = []
        self.set_local(msg, "received_header_times", received_times)

        date_re = Regex(r"(\s.?\d+ \S\S\S \d+ \d+:\d+:\d+ \S+)")
        for rcvd in msg.get_decoded_header("Received"):
            try:
                date = date_re.search(rcvd).group()
            except (AttributeError, TypeError):
                date = None
            if not date:
                continue
            self.ctxt.log.debug("eval: trying Received header date for "
                                "real time: %s", date)
            received_time = self._parse_rfc822_date(date)
            if received_time:
                received_times.append(received_time)
        return received_times

    def _check_date_diff(self, msg):
        try:
            return self.get_local(msg, "date_diff")
        except KeyError:
            pass

        self.set_local(msg, "date_diff", datetime.timedelta(0))

        date_header_time = self._get_date_header_time(msg)
        received_header_times = self._get_received_header_times(msg)

        diffs = []
        for received_header_time in received_header_times:
            diff = abs(received_header_time - date_header_time)
            if diff:
                diffs.append(diff)

        try:
            date_diff = sorted(diffs)[0]
            self.set_local(msg, "date_diff", date_diff)
            return date_diff
        except IndexError:
            self.set_local(msg, "date_diff", datetime.timedelta(0))
            return datetime.timedelta(0)

    def subject_is_all_caps(self, msg, target=None):
        """Checks if the subject is all capital letters.

        This eval rule ignore short subjects, one word subject and
        the prepended notations. (E.g. ``Re:``)
        """
        for subject in msg.get_decoded_header("Subject"):
            # Remove the Re/Fwd notations in the subject
            subject = Regex(r"^(Re|Fwd|Fw|Aw|Antwort|Sv):", re.I).sub("",
                                                                     subject)
            subject = subject.strip()
            if len(subject) < 10:
                # Don't match short subjects
                continue
            if len(subject.split()) == 1:
                # Don't match one word subjects
                continue
            if subject.isupper():
                return True
        return False

    def check_for_to_in_subject(self, msg, test, target=None):
        """
        Check if to address is in Subject field.

        If it is called with 'address', check if full address is in subject,
        else if the parameter is 'user', then check if user name is in subject.
        """
        full_to = msg.get_all_addr_header('To')
        if not full_to:
            return False
        subject = msg.msg.get('Subject', "")
        for to in full_to:
            if test == "address":
                subject_regex = Regex(r".*" + re.escape(to) + r".*", re.I)
                if subject_regex.search(subject):
                    return True
            elif test == "user":
                regex = re.match("(\S+)@.*", to)
                if regex:
                    to = regex.group(1)
                    if Regex(r"^" + re.escape(to) + "$").search(subject):
                        return True
                    if Regex(r"(?:re|fw):\s*(?:\w+\s+)?" + re.escape(to) + "$")\
                            .search(subject):
                        return True
                    if Regex(r"\s*" + re.escape(to) + "[,:;!?-]$")\
                            .search(subject):
                        return True
                    if Regex(r"^" + re.escape(to) + "\s*[,:;!?-](\s).*")\
                            .search(subject):
                        return True
        return False

    def check_outlook_message_id(self, msg, target=None):
        message_id = msg.msg.get("Message-ID")
        if not message_id:
            return
        msg_regex = Regex(r"^<[0-9a-f]{4}([0-9a-f]{8})\$[0-9a-f]{8}\$["
                          r"0-9a-f]{8}\@")
        regex = msg_regex.search(message_id)
        if not regex:
            return False
        timetocken = int(regex.group(1), 16)

        date = msg.msg.get("Date")
        x = 0.0023283064365387
        y = 27111902.8329849
        mail_date = time.mktime(email.utils.parsedate(date))
        expected = int((mail_date * x) + y)
        if abs(timetocken - expected) < 250:
            return False
        received = msg.msg.get("Received")
        received_regex = Regex(r"(\s.?\d+ \S\S\S \d+ \d+:\d+:\d+ \S+).*?$")
        regex = received_regex.search(received)
        received_date = 0
        if regex:
            received_date = time.mktime(email.utils.parsedate(regex.group()))
        expected = int((received_date * x) + y)
        return abs(timetocken - expected) >= 250

    def check_messageid_not_usable(self, msg, target=None):
        list_unsubscribe = msg.msg.get("List-Unsubscribe")
        if list_unsubscribe:
            if Regex(r"<mailto:(?:leave-\S+|\S+-unsubscribe)\@\S+>$").search(
                    list_unsubscribe):
                return True
        if self.gated_through_received_hdr_remover(msg):
            return True
        received = msg.msg.get("Received")
        if Regex(r"/CWT/DCE\)").search(received):
            return True
        if Regex(r"iPlanet Messaging Server").search(received):
            return True
        return False

    def check_header_count_range(self, msg, header, minr, maxr, target=None):
        """Check if the count of the header is withing the given range.
        The range is inclusive in both ranges.

        :param header: the header name
        :param minr: the minimum number of headers with the same name
        :param maxr: the minimum number of headers with the same name
        :return: True if the header count is withing the range.
        """
        raw_headers = set(msg.get_raw_header(header))
        return int(minr) <= len(raw_headers) <= int(maxr)

    def check_unresolved_template(self, msg, target=None):
        message = msg.raw_msg
        headers = message.split("\n")
        for header in headers:
            if Regex(r"%[A-Z][A-Z_-]").search(header) and not \
                    Regex(r"^(?:x-vms-to|x-uidl|x-face|to|cc|from|subject|"
                          r"references|in-reply-to|(?:x-|resent-|"
                          r"x-original-)?message-id):").search(header.lower()):
                return True
        return False

    def check_ratware_name_id(self, msg, target=None):
        """Check if message-id is ratware or not."""
        message_id = msg.msg.get("Message-Id")
        from_header = msg.msg.get("From")
        if not message_id and not from_header:
            return False
        regex = Regex(r"<[A-Z]{28}\.([^>]+?)>").search(message_id)
        if regex:
            if Regex(r"\"[^\"]+\"\s*<" + regex.group(1) + ">").search(
                    from_header):
                return True
        return False

    def check_in_TL_TLDS(self, address):
        if address in self["util_rb_tld"]:
            return True
        if address in self["util_rb_2tld"]:
            return True
        if address in self["util_rb_3tld"]:
            return True
        return False

    def is_domain_valid(self, domain):
        domain = domain.lower()
        if " " in domain:
            return False
        parts = domain.split(".")
        if len(parts) <= 1:
            return False
        elif not self.check_in_TL_TLDS(".".join(parts[1:])):
            return False
        return True

    def check_ratware_envelope_from(self, msg, target=None):
        """Check if envelope-from address is ratware or not."""
        to_header = msg.msg.get("To")
        envelope_from = msg.sender_address
        if not to_header or not envelope_from:
            return False
        if Regex(r"^SRS\d=").search(envelope_from):
            return False
        regex = Regex(r"^([^@]+)@(.+)$").search(to_header)
        if regex:
            user = regex.group(1)
            dom = regex.group(2)
            if not self.is_domain_valid(dom):
                return False
            if Regex(r"\b" + dom + "." + user + "@").search(envelope_from):
                return True
        return False

    def gated_through_received_hdr_remover(self, msg, target=None):
        """Check if the email is gated through ezmlm"""
        txt = ''.join(msg.get_decoded_header("Mailing-List"))
        rcvd = ''.join(msg.get_decoded_header("Received"))
        if Regex(r"^contact \S+\@\S+\; run by ezmlm$").search(txt):
            dlto = ''.join(msg.get_decoded_header("Delivered-To"))
            mailing_list_re = Regex(r"^mailing list \S+\@\S+")
            qmail_re = Regex(r"qmail \d+ invoked (?:from "
                             r"network|by .{3,20})\); \d+ ... \d+")
            if mailing_list_re.search(dlto) and qmail_re.search(rcvd):
                return True
        if not rcvd:
            return True
        if Regex(r"from groups\.msn\.com \(\S+\.msn\.com ").search(rcvd):
            return True
        return False

    def check_equal_from_domains(self, msg, target=None):
        """Check if the domain from `From` header and `EnvelopeFrom` header
        are different."""
        from_addr = ''.join(msg.get_all_addr_header("From"))
        envfrom = msg.sender_address
        if not envfrom or not from_addr:
            return False
        try:
            fromdomain = from_addr.rsplit("@", 1)[-1]
        except (IndexError, AttributeError):
            return False
        try:
            envfromdomain = envfrom.rsplit("@", 1)[-1]
        except (IndexError, AttributeError):
            return False
        self.ctxt.log.debug("eval: From 2nd level domain: %s, "
                            "EnvelopeFrom 2nd level domain: %s", fromdomain,
                            envfromdomain)
        if envfromdomain.lower() not in fromdomain.lower():
            return True
        return False

    def _check_date_received(self, msg):
        try:
            return self.get_local(msg, "date_received")
        except KeyError:
            pass

        all_times = self._get_received_header_times(msg) + [
            self._get_date_header_time(msg)
        ]
        median = int(len(all_times)/2)
        date_received = sorted(all_times, reverse=True)[median]
        self.set_local(msg, "date_received", date_received)
        return date_received

    def received_within_months(self, msg, min, max, target=None):
        """Check if the date from received is in past"""
        if min == "undef":
            min = None
        if max == "undef":
            max = None
        try:
            if min:
                min = int(min)
            if max:
                max = int(max)
        except ValueError:
            self.ctxt.log.warn("HeaderEval::Plugin received_within_months "
                               "min and max should be integer values")
            return False

        diff = relativedelta.relativedelta(
            datetime.datetime.utcnow(),
            self._check_date_received(msg)
        )

        number_months = (diff.year or 0) * 12 + (diff.months or 0)

        if ((not min or number_months >= min) and
                (not max or number_months < max)):
            return True
        return False

    def check_for_shifted_date(self, msg, min=None, max=None, target=None):
        """Check if the difference between Date header and date from received
        headers its between min,max interval

        * min: minimum time express in hours
        * max: maximum time express in hours
        """
        if min == "undef":
            min = None
        if max == "undef":
            max = None
        try:
            if min:
                min = int(min)
            if max:
                max = int(max)
        except ValueError:
            self.ctxt.log.warn("HeaderEval::Plugin check_for_shifted_date "
                               "min and max should be integer values")
            return False
        diff = int(self._check_date_diff(msg).total_seconds() / 3600)
        if ((not min or diff >= min) and
                (not max or diff < max)):
            return True
        return False