SpamExperts/OrangeAssassin

View on GitHub
oa/plugins/body_eval.py

Summary

Maintainability
A
1 hr
Test Coverage
"""Expose some eval rules that do checks on the body."""

from __future__ import division
from __future__ import absolute_import

import re
import collections

import oa.plugins.base

from oa.regex import Regex

SPACE_COUNT = Regex(r"\s")
SPLIT_WORDS = Regex(r"\s+")
REMOVE_OTHER = Regex(r"[^\w]")
STOCK_RE = Regex(r"""
    ^trad(?:e|ing)date|
    company(?:name)?|
    s\w?(?:t\w?o\w?c\w?k|y\w?m(?:\w?b\w?o\w?l)?)|
    t(?:arget|icker)|
    (?:opening|current)p(?:rice)?|
    p(?:rojected|osition)|
    expectations|
    weeks?high|
    marketperformance|
    (?:year|week|month|day|price)(?:target|estimates?)|
    sector|
    r(?:ecommendation|ating)
""", re.I | re.S | re.X | re.M)


class BodyEval(oa.plugins.base.BasePlugin):
    eval_rules = (
        "multipart_alternative_difference",
        "multipart_alternative_difference_count",
        "check_blank_line_ratio",
        "tvd_vertical_words",
        "check_stock_info",
    )

    def check_start(self, msg):
        """Initialize a empty list that will contain all
        the tokens from each multipart/alternative parts.
        """
        super(BodyEval, self).check_start(msg)
        self.set_local(msg, "multiparts", [])
        self.set_local(msg, "text_tokens", collections.Counter())
        self.set_local(msg, "html_tokens", collections.Counter())

    def extract_metadata(self, msg, payload, text, part):
        """Parse each part and extract the relevant tokens."""
        super(BodyEval, self).extract_metadata(msg, payload, text, part)
        multiparts = self.get_local(msg, "multiparts")
        if part.get_content_type() == "multipart/alternative":
            # The actual parts and text will come after
            # get the list of ids for each subpart of this
            # multipart.
            multiparts.extend(id(subpart) for subpart in part.get_payload())
            return

        content_type = part.get_content_type()
        if (id(part) not in multiparts or not text or
                content_type not in ("text/plain", "text/html")):
            # Unknown part or empty part, skip it.
            return

        words = (REMOVE_OTHER.sub('', word)
                 for word in SPLIT_WORDS.split(text))
        if content_type.lower() == "text/plain":
            self.get_local(msg, "text_tokens").update(
                word.lower() for word in words if word
            )
        elif content_type.lower() == "text/html":
            self.get_local(msg, "html_tokens").update(
                word.lower() for word in words if word
            )

    def parsed_metadata(self, msg):
        """Compute the max difference between the html and
        plain tokens of the multipart alternatives parts
        of the message.
        """
        super(BodyEval, self).parsed_metadata(msg)
        text_tokens = self.get_local(msg, "text_tokens")
        html_tokens = self.get_local(msg, "html_tokens")

        valid_count = 0
        for token, count in text_tokens.items():
            # If the token appears at least as many
            # times in the text part as in the html
            # part then it is valid.
            if token in html_tokens and count >= html_tokens[token]:
                valid_count += 1
        try:
            token_count = len(html_tokens)
            diff = abs((token_count - valid_count) / token_count * 100)
        except ZeroDivisionError:
            diff = 0.0
        self.set_local(msg, "madiff", diff)
        self.ctxt.log.debug("Text tokens=%s, HTML tokens=%s, valid count=%s, "
                            "diff=%.2f", len(text_tokens), len(html_tokens),
                            valid_count, diff)

        self.set_local(msg, "line_count", msg.raw_text.count("\n"))
        self.set_local(msg, "blank_line_count", msg.raw_text.count("\n\n"))

    def multipart_alternative_difference(self, msg, minr, maxr, target=None):
        """Check the difference between the text and html parts of
        the message. Every word seen in the text part of the
        message should also exist at least as many times in the
        HTML version.

        This eval rule checks the ratio of difference between
        the two parts between the specified thresholds. If both
        parts are identical this will give a ration of 0%. If
        none of the words seen in the text part are present in
        the HTML part the ratio is 100%.

        Note that this excludes any HTML tags from the count.

        :param msg: the message that's being checked
        :param minr: the inferior value of the threshold
        :param maxr: the superior value of the threshold
        :return: True if the ratio is between the two values
          and False otherwise.

        """
        minr, maxr = float(minr), float(maxr)
        return minr <= self.get_local(msg, "madiff") <= maxr

    def multipart_alternative_difference_count(self, msg, ratio, minhtml,
                                               target=None):
        """Check the ration of unique tokens seen in the text
        and HTML version of the message. This does not take
        into account how many times each token has been seen
        in each part, but only checks the raw ratio.

        If more tokens are seen in the text version. the ratio
        will be larger than 1.0. If more tokens are seen in the
        HTML version, the ration will be smaller then 1.0.

        :param ratio: the inferior threshold value of the ratio
        :param minhtml: if there are less than this number
          of HTML tokens, the rule won't match
        :return: True if the ratio is larger than the ratio
          parameter and there are at least `minhtml` tokens

        """
        ratio, minhtml = float(ratio), int(minhtml)

        text_tokens = len(self.get_local(msg, "text_tokens"))
        html_tokens = len(self.get_local(msg, "html_tokens"))

        if html_tokens < minhtml:
            return False
        try:
            return (text_tokens / html_tokens) > ratio
        except ZeroDivisionError:
            return False

    def check_blank_line_ratio(self, msg, minr, maxr, minlines=1, target=None):
        """Check the ratio of blank lines to the number of
        lines in the message body.

        :param minr: the inferior value of the threshold
        :param maxr: the superior value of the threshold
        :param minlines: this rule will match only if the
          message has at least this number of lines.
        :return: True if the ratio is between the two values
          and False otherwise.

        """
        minr, maxr, minlines = float(minr), float(maxr), int(minlines)
        minlines = max(minlines, 1)
        line_count = self.get_local(msg, "line_count")
        blank_line_count = self.get_local(msg, "blank_line_count")

        if self.get_local(msg, "line_count") < minlines:
            return False
        ratio = blank_line_count / line_count * 100
        return minr <= ratio <= maxr

    # XXX Strange name?
    def tvd_vertical_words(self, msg, minr, maxr, target=None):
        """Check the ratio of spaces to non-spaces.

        :param minr: the inferior value of the threshold
        :param maxr: the superior value of the threshold
        :return: True if the ratio is between the two values
          and False otherwise.

        """
        if target == "rawbody":
            text = msg.raw_text
        else:
            text = msg.text

        space_count = sum(1 for _ in SPACE_COUNT.finditer(text))
        try:
            ratio = space_count / (len(text) - space_count) * 100
        except ZeroDivisionError:
            return False
        return minr <= ratio <= maxr

    def check_stock_info(self, msg, minwords, target=None):
        """Check the message for common stock market words.

        :param minwords: the minimum number of words to be
          found for this rule to match.
        :return: True if there are at least `minwords` found
          in the message, and False otherwise.
        """
        minwords = int(minwords)
        if target == "rawbody":
            text = msg.raw_text
        else:
            text = msg.text
        stock_count = sum(1 for _ in STOCK_RE.finditer(text))
        return stock_count >= minwords