SpamExperts/OrangeAssassin

View on GitHub
oa/plugins/spam_cop.py

Summary

Maintainability
A
1 hr
Test Coverage
"""SpamCop plugin"""

from __future__ import absolute_import

import os
import re
import pwd
import sys
import time
import random
import smtplib
import email.utils
from email.mime import multipart, text, base

import oa.plugins.base
from oa.regex import Regex


class SpamCopPlugin(oa.plugins.base.BasePlugin):
    options = {
        "dont_report_to_spamcop": ("bool", False),
        "spamcop_from_address": ("str", ""),
        "spamcop_to_address": ("str", "spamassassin-submit@spam.spamcop.net"),
        "spamcop_max_report_size": ("int", 50)
    }

    def _spamcop_report(self, msg):
        """
        Check if message should be reported as spam or not.
        :param msg:
        """
        if not self["dont_report_to_spamcop"]:
            if self.plugin_report(msg):
                self.ctxt.log.debug("Spam reported to SpamCop")
                return True
            else:
                self.ctxt.log.debug("Could not report spam to SpamCop")
        return False

    def get_now_date(self):
        """
        Get actual date in timestamp format.
        """
        time_now = email.utils.formatdate(localtime=True)
        now_date = time.mktime(email.utils.parsedate(time_now))
        return now_date

    def get_mail_date(self, msg):
        """
        Get mail date in timestamp format.
        :param msg:
        """
        received_header = msg.get_decoded_header("Received")[0]
        time_mail = received_header.split(";")[1]
        mail_date = time.mktime(email.utils.parsedate(time_mail))
        return mail_date

    def send_mail_method(self, sender, receiver, message):
        """
        Send mail using smtplib module.
        :param sender:
        :param receiver:
        :param message:
        """
        try:
            regex = Regex(".*@.*").search(receiver)
            domain = regex.group().split('@')[1]
            # return value like '0 mail.domain.com.'
            mx_domain = self.ctxt.dns.query(domain, 'MX')[0].to_text()
            mx_domain = mx_domain.split()[1][:-1]

            smtp_obj = smtplib.SMTP()
            smtp_obj.connect(mx_domain, 587)
            smtp_obj.helo(mx_domain.split('.')[1])
            smtp_obj.sendmail(sender, receiver, message)
            smtp_obj.quit()
        except BaseException:
            self.ctxt.log.warning("SpamCop report failed.")
            return False
        return True

    def plugin_report(self, msg):
        """
        Report spam to "spamcop_to_address". If the message is larger than
        "spamcop_max_report_size", then it will be truncated in report
        message.
        :param msg:
        """
        if not re.match(".+@.+", self["spamcop_to_address"]):
            self.ctxt.log.warning("Missing required value")
            return False
        mail_date = self.get_mail_date(msg)
        now_date = self.get_now_date()
        if not mail_date or mail_date < now_date - 2*86400:
            self.ctxt.log.debug("Message older than 2 days, not reporting")
            return False

        original = msg.raw_msg
        description = "spam report via %s" % sys.version[:5]
        trusted = str(msg.trusted_relays).strip('[]')
        untrusted = str(msg.untrusted_relays).strip('[]')
        host = os.uname()[1] or "unknown"
        user = pwd.getpwuid(os.getuid())[0] or "unknown"

        message = email.mime.multipart.MIMEMultipart()
        message["From"] = self["spamcop_from_address"] or "%s@%s" % (user, host)
        message["To"] = self["spamcop_to_address"]
        message["Subject"] = "report spam"
        message["Date"] = email.utils.formatdate(localtime=True)
        message["Message-Id"] = "<%X.%X@%s>" % (int(now_date),
                                                random.randint(1, 2 ** 32),
                                                host)
        if len(original) > self["spamcop_max_report_size"]*1024:
            x = self["spamcop_max_report_size"]*1024
            original = original[:x] + "\n[truncated by OrangeAssassin]\n"

        message.attach(email.mime.text.MIMEText(
            'This is a multi-part message in MIME format.'))
        original_attachment = email.mime.base.MIMEBase(
            "message", "rfc822; x-spam-type=report")
        original_attachment.add_header("Content-Disposition", "attachment")
        original_attachment.add_header("Content-Description", description)
        original_attachment.add_header("X-Spam-Relays-Trusted", trusted)
        original_attachment.add_header("X-Spam-Relays-Untrusted", untrusted)
        original_attachment.set_payload(original)
        message.attach(original_attachment)

        self.ctxt.log.debug("Sending email to...%s", self["spamcop_to_address"])
        return self.send_mail_method(self["spamcop_from_address"],
                                     self["spamcop_to_address"],
                                     message.as_string())