SpamExperts/OrangeAssassin

View on GitHub
oa/plugins/auto_learn_threshold.py

Summary

Maintainability
B
6 hrs
Test Coverage
"""Implements the functionality to submit messages for learning when they
fall outside the defined threshold"""
from __future__ import absolute_import

import oa.plugins.base

MIN_BODY_POINTS = 3
MIN_HEADER_POINTS = 3
MIN_BODY_POINTS_LOW_THRESHOLD = -99
MIN_HEADER_POINTS_LOW_THRESHOLD = -99
LEARNER_HAM_POINTS = -1
LEARNER_SPAM_POINTS = 1


class AutoLearnThreshold(oa.plugins.base.BasePlugin):
    options = {
        "bayes_auto_learn_threshold_nonspam": ("float", 0.1),
        "bayes_auto_learn_threshold_spam": ("float", 12.0),
        "bayes_auto_learn_on_error": ("bool", False)
    }

    def valid_tests(self, tests):
        """Yields valid tests for autolearning.
         Exclude all tests flagged with noautolearn, userconf or have a 0 score
        """
        for name, rule in tests.items():
            if not rule.score:
                continue
            tflags = rule.tflags or []
            if set(tflags).intersection(["noautolearn", "userconf"]):
                self.ctxt.log.debug("Skipping test because of matching tflag")
                continue
            yield name, rule

    def bayes_agrees(self, msg):
        """Checks if the bayes plugin considered agrees with the autolearn
        plugin classification
        """
        learner_thinks_spam = self.get_local(msg, "learner_thinks_spam")
        bayes_thinks_spam = self.get_local(msg, "bayes_thinks_spam")
        bayes_thinks_ham = self.get_local(msg, "bayes_thinks_ham")
        if learner_thinks_spam and bayes_thinks_spam:
            return True
        if not learner_thinks_spam and bayes_thinks_ham:
            return True
        return False

    def should_learn(self, msg):
        """Checks if the necessary conditions for learning are met"""
        body_points = self.get_local(msg, "body_points")
        header_points = self.get_local(msg, "header_points")
        learned_points = self.get_local(msg, "learned_points")
        min_body_points = self.get_local(msg, "min_body_points")
        min_header_points = self.get_local(msg, "min_header_points")
        autolearn_forced = self.get_local(msg, "autolearn_forced")

        if self.get_local(msg, "learner_thinks_spam"):
            if header_points < min_header_points:
                self.ctxt.log.debug("not learning, header score: %s < %s",
                                    header_points, min_header_points)
                return False
            if body_points < min_body_points:
                self.ctxt.log.debug("not learning, body score: %s < %s",
                                    body_points, min_body_points)
                return False
            if learned_points < LEARNER_HAM_POINTS:
                self.ctxt.log.debug("not learning, learn score: %s < %s",
                                    learned_points, LEARNER_HAM_POINTS)
                return False
            if msg.score <= self.ctxt.conf['required_score']:
                self.ctxt.log.debug("not learning, msg score: %s < %s",
                                    msg.score, self.ctxt.conf['required_score'])
                return False
        elif self.get_local(msg, "learner_thinks_ham"):
            if learned_points > LEARNER_SPAM_POINTS:
                self.ctxt.log.debug("not learning, learn score: %s > %s",
                                    learned_points, LEARNER_SPAM_POINTS)
                return False
            if msg.score >= self.ctxt.conf['required_score']:
                self.ctxt.log.debug("not learning, msg score: %s >= %s",
                                    msg.score, self.ctxt.conf['required_score'])
                return False
        else:
            self.ctxt.log.debug(
                "not learning, autolearn score between threshold: %s",
                self.get_local(msg, "autolearn_points"))
            return False

        if self['bayes_auto_learn_on_error'] and self.bayes_agrees(msg):
            self.ctxt.log.debug("not learning, bayes agrees with classification")
            return False

        self.ctxt.log.info("Learning: score: %s min:%s, max:%s, forced: %s",
                           self.get_local(msg, "autolearn_points"),
                           self['bayes_auto_learn_threshold_nonspam'],
                           self['bayes_auto_learn_threshold_spam'],
                           autolearn_forced)
        return True

    def prepare_learning_metadata(self, msg, tests):
        """Iterates through the tests and extracts necessary information
        * l:bayes_thinks_spam if the bayes plugin test BAYES_99 is present
        * l:bayes_thinks_ham if the bayes plugin test BAYES_00 is present
        * l:header_points total score for header rules
        * l:body_points total score for header rules
        * l:learned_points total score for tests with the tflag learn
        * l:autolearn_points total score for rules except those with
            noautolearn, userconf, learn tflags
        * g:autolearn_forced if any of the rules had the flag
        * l:learner_thinks_spam if the autolearn_points is over the threshold
        * l:learner_thinks_ham if the autolearn_points is below the threshold
        * g:min_header_points minimum score that header tests must achieve
        * g:min_body_points minimum score that body tests must achieve
        """
        header_points = body_points = learn_points = points = 0
        self.set_local(msg, "bayes_thinks_spam", "BAYES_99" in tests)
        self.set_local(msg, "bayes_thinks_ham", "BAYES_00" in tests)
        autolearn_forced = False
        for name, rule in self.valid_tests(tests):
            tflags = rule.tflags or list()
            if rule.rule_type == 'header':
                header_points += rule.score
            elif rule.rule_type in ('body', 'uri'):
                body_points += rule.score
            elif rule.rule_type == 'meta' and 'net' not in tflags:
                header_points += rule.score
                body_points += rule.score
            if "learn" in tflags:
                learn_points += rule.score
                continue
            autolearn_forced = autolearn_forced or "autolearn_force" in tflags
            points += rule.score
        self.ctxt.log.debug(
            "autolearn_points: %s, "
            "header_poins: %s, "
            "body_points: %s, "
            "learned_points: %s, "
            "autolearn_forced: %s, "
            "ham_threshold: %s, "
            "spam_threshold: %s",
            points, header_points, body_points, learn_points, autolearn_forced,
            self['bayes_auto_learn_threshold_spam'],
            self['bayes_auto_learn_threshold_nonspam']
        )
        self.set_local(msg, "autolearn_points", points)
        self.set_local(msg, "header_points", header_points)
        self.set_local(msg, "body_points", body_points)
        self.set_local(msg, "learned_points", learn_points)
        self.set_local(msg, "autolearn_forced", autolearn_forced)
        self.set_local(msg, "learner_thinks_spam",
                       points >= self['bayes_auto_learn_threshold_spam'])
        self.set_local(msg, "learner_thinks_ham",
                       points < self['bayes_auto_learn_threshold_nonspam'])
        self.set_local(msg, 'autolearn_forced', autolearn_forced)
        if autolearn_forced:
            self.set_local(msg, 'min_body_points',
                           MIN_BODY_POINTS_LOW_THRESHOLD)
            self.set_local(msg, 'min_header_points',
                           MIN_HEADER_POINTS_LOW_THRESHOLD)
        else:
            self.set_local(msg, 'min_body_points', MIN_BODY_POINTS)
            self.set_local(msg, 'min_header_points', MIN_HEADER_POINTS)

    def auto_learn_discriminator(self, ruleset, msg):
        """Decides if a message should be submitted for autolearning
        and submits it
        """
        bayes = self.ctxt.plugins.get("BayesPlugin", None)
        self.prepare_learning_metadata(msg, tests=ruleset.checked)
        if self.should_learn(msg) and bayes is not None:
            if self.get_local(msg, "learner_thinks_spam"):
                bayes.plugin_report(msg)
            elif self.get_local(msg, "learner_thinks_spam"):
                bayes.plugin_revoke(msg)