oa/plugins/free_mail.py
"""FreeMail Plugin
The FreeMail plugin checks the headers for indication that the sender's
domain is that of a site offering free email services.
"""
from builtins import str
import re
import oa.plugins.base
from oa.regex import Regex
EMAIL_WHITELIST = Regex(r"""
^(?:
abuse|support|sales|info|helpdesk|contact|kontakt
| (?:post|host|domain)master
| undisclosed.* # yahoo.com etc(?)
| request-[a-f0-9]{16} # live.com
| bounced?- # yahoo.com etc
| [a-f0-9]{8}(?:\.[a-f0-9]{8}|-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) # gmail msgids?
| .+=.+=.+ # gmail forward
)\@
""", re.X | re.I)
SKIP_REPLYTO_FROM = Regex(r"""
(?:
^(?:post|host|domain)master
| ^double-bounce
| ^(?:sentto|owner|return|(?:gr)?bounced?)-.+
| -(?:request|bounces?|admin|owner)
| \b(?:do[._-t]?)?no[._-t]?repl(?:y|ies)
| .+=.+
)\@
""", re.X | re.I)
class FreeMail(oa.plugins.base.BasePlugin):
eval_rules = (
"check_freemail_replyto",
"check_freemail_from",
"check_freemail_header",
"check_freemail_body"
)
options = {
"freemail_max_body_emails": ("int", 5),
"freemail_max_body_freemails": ("int", 3),
"freemail_skip_when_over_max": ("bool", True),
"freemail_skip_bulk_envfrom": ("bool", True),
"freemail_add_describe_email": ("bool", True),
"freemail_domains": ("append_split", []),
"freemail_whitelist": ("append_split", []),
"util_rb_tld": ("append_split", []),
"util_rb_2tld": ("append_split", []),
"util_rb_3tld": ("append_split", [])
}
def check_start(self, msg):
"""Verify that the domains are valid and separate wildcard
domains from the rest."""
domain_re = Regex(r'^[a-z0-9.*?-]+$')
freemail_domains = self.get_global('freemail_domains')
freemail_temp_wc = []
for domain in freemail_domains[:]:
if not domain_re.search(domain):
freemail_domains.remove(domain)
self.ctxt.log.warn(
"FreeMail::Plugin Invalid freemail domain: %s", domain)
if '*' in domain:
temp = domain.replace('.', '\.')
temp = temp.replace('?', '.')
temp = temp.replace('*', '[^.]*')
freemail_temp_wc.append(temp)
if freemail_temp_wc:
wild_doms = r'\@(?:{0})$'.format('|'.join(freemail_temp_wc))
self.set_global('freemail_domains_re', Regex(wild_doms))
self.set_global('freemail_domains', freemail_domains)
valid_tlds = (self.get_global('util_rb_tld') +
self.get_global('util_rb_2tld') +
self.get_global('util_rb_3tld'))
tlds_re = r'(?:{0})'.format("|".join(valid_tlds))
email_re = Regex(r"""
(?=.{{0,64}}\@) # limit userpart to 64 chars (and speed up searching?)
(?<![a-z0-9!#\$%&'*+\/=?^_`{{|}}~-]) # start boundary
( # capture email
[a-z0-9!#\$%&'*+\/=?^_`{{|}}~-]+ # no dot in beginning
(?:\.[a-z0-9!#\$%&'*+\/=?^_`{{|}}~-]+)* # no consecutive dots, no ending dot
\@
(?:[a-z0-9](?:[a-z0-9-]{{0,59}}[a-z0-9])?\.){{1,4}} # max 4x61 char parts (should be enough?)
{tld} # ends with valid tld
)
(?!(?:[a-z0-9-]|\.[a-z0-9])) # make sure domain ends here
""".format(tld=tlds_re), re.X | re.I)
self.set_global('email_re', email_re)
self.set_global('body_emails', set())
self.set_global("check_if_parsed", False)
def extract_metadata(self, msg, payload, text, part):
"""Parse all emails from text/plain and text/html parts."""
if part.get_content_type() in ("text/plain", "text/html"):
get_global = self.get_global
body_emails = get_global('body_emails')
for email in get_global('email_re').findall(part.get_payload()):
body_emails.add(email)
self.set_global('body_emails', body_emails)
def check_freemail_replyto(self, msg, option=None, target=None):
"""Checks/compares freemail addresses found from headers and body
Possible options:
- replyto From: or body address is different than Reply-To
(this is the default)
- reply as above, but if no Reply-To header is found,
compares From: and body
"""
get_global = self.get_global
if option and option not in ('replyto', 'reply'):
self.ctxt.log.warn("FreeMail::Plugin check_freemail_replyto "
"invalid option: %s", option)
return False
elif not option:
option = 'replyto'
if get_global('freemail_skip_bulk_envfrom'):
header_emails = []
header_emails.append(msg.sender_address)
for email in header_emails:
if SKIP_REPLYTO_FROM.search(email):
self.ctxt.log.warn(
"FreeMail::Plugin check_freemail_replyto "
"envelope sender looks bulk skipping check: %s",
email)
return False
try:
from_email = get_global(
'email_re').search(msg.msg['From']).group()
except (AttributeError, TypeError, KeyError):
from_email = ''
try:
reply_to = get_global(
'email_re').search(msg.msg['Reply-To']).group()
except (AttributeError, TypeError, KeyError):
reply_to = ''
from_email_frm = self._is_freemail(from_email)
reply_to_frm = self._is_freemail(reply_to)
if (from_email_frm and reply_to_frm and
from_email != reply_to):
self.ctxt.log.warn(
"FreeMail::Plugin check_freemail_replyto "
"HIT! From and Reply-To are different freemails")
result = "From and Reply-To are different freemails"
return str(result)
if option == 'replyto' and not reply_to_frm:
self.ctxt.log.warn("FreeMail::Plugin check_freemail_replyto "
"Reply-To is not freemail, skipping check")
return False
elif option == 'reply':
if reply_to and not reply_to_frm:
self.ctxt.log.warn("FreeMail::Plugin check_freemail_replyto "
"Reply-To is defined but is not freemail, "
"skipping check")
return False
elif not from_email_frm:
self.ctxt.log.warn("FreeMail::Plugin check_freemail_replyto "
"No Reply-To and From is not freemail, "
"skipping check")
return False
if not self._parse_body():
return False
reply = reply_to if reply_to_frm else from_email
check = reply_to if option == 'replyto' else reply
for email in get_global("freemail_body_emails"):
if email != check:
self.ctxt.log.warn("FreeMail::Plugin check_freemail_replyto "
"HIT! %s and %s are different freemails",
check, email)
result = "Different freemails in reply header and body"
if self["freemail_add_describe_email"]:
_check = check.replace("@", "[at]") + " "
_email = "(" + _check + email.replace("@", "[at]") + ")"
result = result + "\n\t" + _email
return str(result)
return False
def check_freemail_from(self, msg, regex=None, target=None):
"""Check if in specified header gave as parameter
is a freemail or no. It is possible to provide a regex
rule to match against too.
Returns True if it is or False otherwise
"""
self.ctxt.log.debug("FreeMail::Plugin Eval rule check_freemail_from"
" %s", 'with regex: ' + regex if regex else '')
all_from_headers = ['From', 'Envelope-Sender',
'Resent-Sender', 'X-Envelope-From',
'EnvelopeFrom', 'Resent-From']
header_emails = []
if regex:
try:
check_re = Regex(regex)
except re.error:
self.ctxt.log.warn("FreeMail::Plugin check_freemail_from"
" regex error")
return False
else:
check_re = None
header_emails = msg.get_all_from_headers_addr()
header_emails = sorted(set(header_emails))
if not header_emails:
self.ctxt.log.debug("FreeMail::Plugin check_freemail_from"
" no emails found in from headers: %s",
all_from_headers)
return False
for email in header_emails:
if self._is_freemail(email):
if check_re and not check_re.search(email):
return False
elif check_re and check_re.search(email):
self.ctxt.log.debug(
"FreeMail::Plugin check_freemail_from"
" HIT! %s is freemail and matches regex", email)
result = "Sender address is freemail and matches regex"
if self["freemail_add_describe_email"]:
_email = "(" + email.replace("@", "[at]") + ")"
result = result + "\n\t" + _email
return str(result)
self.ctxt.log.debug("FreeMail::Plugin check_freemail_from"
" HIT! %s is freemail", email)
result = "Sender address is freemail"
if self["freemail_add_describe_email"]:
_email = "(" + email.replace("@", "[at]") + ")"
result = result + "\n\t" + _email
return str(result)
return False
def check_freemail_header(self, msg, header, regex=None, target=None):
"""Check all possible 'from' headers to see if sender
is freemail. It is possible to provide a regex
rule to match against too.
Returns True if it is or False otherwise
"""
self.ctxt.log.debug("FreeMail::Plugin check_freemail_header"
" %s", 'with regex: ' + regex if regex else '')
if not header:
self.ctxt.log.warn("FreeMail::Plugin check_freemail_header"
" requires an argument")
return False
if regex:
try:
check_re = Regex(regex).compile()
except re.error:
self.ctxt.log.warn("FreeMail::Plugin check_freemail_header"
" regex error")
return False
else:
check_re = None
if not msg.msg.get(header, None):
self.ctxt.log.debug("FreeMail::Plugin check_freemail_header"
" header: %s not found", header)
return False
header_emails = self.get_global('email_re').findall(msg.msg[header])
if not header_emails:
self.ctxt.log.debug("FreeMail::Plugin check_freemail_header"
" no emails found in header: %s", header)
return False
for email in header_emails:
if self._is_freemail(email):
if check_re and not check_re.search(email):
return False
elif check_re and check_re.search(email):
self.ctxt.log.debug(
"FreeMail::Plugin check_freemail_header"
" HIT! %s is freemail and matches regex", email)
result = ("Header " + header +
" is freemail and matches regex")
if self["freemail_add_describe_email"]:
_email = "(" + email.replace("@", "[at]") + ")"
result = result + "\n\t" + _email
return str(result)
self.ctxt.log.debug("FreeMail::Plugin check_freemail_header"
" HIT! %s is freemail", email)
result = "Header " + header + " is freemail"
if self["freemail_add_describe_email"]:
_email = "(" + email.replace("@", "[at]") + ")"
result = result + "\n\t" + _email
return str(result)
return False
def check_freemail_body(self, msg, regex=None, target=None):
"""
Check if there are free emails in body parts
of the message
"""
self.ctxt.log.debug("FreeMail::Plugin check_freemail_body"
" %s", 'with regex: ' + regex if regex else '')
body_emails = self.get_global('body_emails')
if not len(body_emails):
self.ctxt.log.debug("FreeMail::Plugin check_freemail_body "
"No emails found in body of the message")
return False
if regex:
try:
check_re = Regex(regex).compile()
except re.error:
self.ctxt.log.warn("FreeMail::Plugin check_freemail_from"
" regex error")
return False
else:
check_re = None
if not self._parse_body():
return False
if check_re:
for email in self.get_global("freemail_body_emails"):
if check_re.search(email):
self.ctxt.log.debug(
"FreeMail::Plugin check_freemail_body"
" HIT! %s is freemail and matches regex", email)
result = "Address from body is freemail and matches regex"
if self["freemail_add_describe_email"]:
_email = "(" + email.replace("@", "[at]") + ")"
result = result + "\n\t" + _email
return str(result)
else:
if len(self.get_global("freemail_body_emails")):
emails = " ,".join(self.get_global("freemail_body_emails"))
self.ctxt.log.debug("FreeMail::Plugin check_freemail_body"
" HIT! body has freemails: %s", emails)
result = "Body has freemails"
if self["freemail_add_describe_email"]:
_emails = "(" + emails.replace("@", "[at]") + ")"
result = result + "\n\t" + _emails
return str(result)
return False
def _parse_body(self):
"""Parse all the emails from body and check
if all conditions are accepted
"""
get_global = self.get_global
if get_global("check_if_parsed"):
return True
body_emails = get_global('body_emails')
freemail_body_emails = []
if (len(body_emails) >= get_global("freemail_max_body_emails") and
get_global("freemail_skip_when_over_max")):
self.ctxt.log.debug("FreeMail::Plugin check_freemail_body "
"too many unique emails found in body")
return False
freemail_count = 0
for email in body_emails:
if self._is_freemail(email):
freemail_count += 1
freemail_body_emails.append(email)
if freemail_count >= get_global("freemail_max_body_freemails"):
self.ctxt.log.debug(
"FreeMail::Plugin check_freemail_body "
"too many unique free emails found in body")
return False
self.set_global("freemail_body_emails", freemail_body_emails)
self.set_global("check_if_parsed", True)
return True
def _is_freemail(self, email):
"""Check if the email is in freemail_domains list
If the email is whitelisted than we skip the check
"""
if not email:
return False
email_domain = email.rsplit('@')[1]
try:
freemail_re = self.get_global('freemail_domains_re')
except KeyError:
freemail_re = None
freemail_whitelist = self.get_global('freemail_whitelist')
freemail_domains = self.get_global('freemail_domains')
if email in freemail_whitelist:
self.ctxt.log.warn(
"FreeMail::Plugin whitelisted email: %s", email)
return False
if email_domain in freemail_whitelist:
self.ctxt.log.warn(
"FreeMail::Plugin whitelisted domain: %s", email_domain)
return False
if EMAIL_WHITELIST.search(email):
self.ctxt.log.warn(
"FreeMail::Plugin whitelisted domain, default: %s",
email_domain)
return False
if (email_domain in freemail_domains or
(freemail_re and freemail_re.search(email))):
return True
return False