oa/message.py
"""Internal representation of email messages."""
from builtins import str
from builtins import set
from builtins import list
from builtins import dict
from builtins import object
import re
import time
import email
import hashlib
import calendar
import functools
import ipaddress
import email.utils
import html.parser
import collections
import email.header
import email.errors
import email.mime.base
import email.mime.text
import email.feedparser
import email.mime.multipart
from future.utils import PY3
import oa
import oa.context
from oa.received_parser import ReceivedParser
from oa.rules.ruleset import RuleSet
from oa.regex import Regex
URL_RE = Regex(r"""
(
\b # the preceding character must not be alphanumeric
(?:
(?:
(?:https? | ftp) # capture the protocol
:// # skip the boilerplate
)|
(?= ftp\.[^\.\s<>"'\x7f-\xff] )| # allow the protocol to be missing,
(?= www\.[^\.\s<>"'\x7f-\xff] ) # but only if the rest of the url
# starts with "www.x" or "ftp.x"
)
(?:[^\s<>"'\x7f-\xff]+) # capture the guts
)
""", re.VERBOSE)
IPFRE = Regex(r"[\[ \(]{1}[a-fA-F\d\.\:]{7,}?[\] \n;\)]{1}")
STRICT_CHARSETS = frozenset(("quopri-codec", "quopri", "quoted-printable",
"quotedprintable"))
class _ParseHTML(html.parser.HTMLParser):
"""Extract data from HTML parts."""
def __init__(self, collector):
try:
html.parser.HTMLParser.__init__(self, convert_charrefs=False)
except TypeError:
# Python 2 does not have the convert_charrefs argument.
html.parser.HTMLParser.__init__(self)
self.reset()
self.collector = collector
def handle_data(self, data):
"""Keep track of the data."""
data = data.strip()
if data:
self.collector.append(data)
class _Headers(collections.defaultdict):
"""Like a defaultdict that returns an empty list by default, but the
keys are all case insensitive.
"""
def __init__(self):
collections.defaultdict.__init__(self, list)
def get(self, k, d=None):
return super(_Headers, self).get(k.lower(), d)
def __setitem__(self, key, value):
super(_Headers, self).__setitem__(key.lower(), value)
def __getitem__(self, key):
return super(_Headers, self).__getitem__(key.lower())
def __contains__(self, key):
return super(_Headers, self).__contains__(key.lower())
class _memoize(object):
"""Memoize the result of the function in a cache. Used to prevent
superfluous parsing of headers.
"""
def __init__(self, cache_name):
self._cache_name = cache_name
def __call__(self, func):
"""Check if the information is available in a cache, if not call the
function and cache the result.
"""
@functools.wraps(func)
def wrapped_func(fself, name):
from oa.config import LAZY_MODE
if LAZY_MODE:
return func(fself, name)
cache = getattr(fself, self._cache_name)
result = cache.get(name)
if result is None:
result = func(fself, name)
cache[name] = result
return result
return wrapped_func
DEFAULT_SENDERH = (
"X-Sender", "X-Envelope-From", "Envelope-Sender", "Return-Path"
)
class Message(oa.context.MessageContext):
"""Internal representation of an email message. Used for rule matching."""
def __init__(self, global_context, raw_msg):
"""Parse the message, extracts and decode all headers and all
text parts.
"""
self.missing_boundary_header = False
self.missing_header_body_separator = False
super(Message, self).__init__(global_context)
self.raw_msg = self.translate_line_breaks(raw_msg)
self.msg = email.message_from_string(self.raw_msg)
self.headers = _Headers()
self.raw_headers = _Headers()
self.addr_headers = _Headers()
self.name_headers = _Headers()
self.mime_headers = _Headers()
self.received_headers = list()
self.raw_mime_headers = _Headers()
self.header_ips = _Headers()
self.text = ""
self.raw_text = ""
self.uri_list = set()
self.score = 0
self.rules_checked = dict()
self.interpolate_data = dict()
self.rules_descriptions = dict()
self.plugin_tags = dict()
# Data
self.sender_address = ""
self.hostname_with_ip = list()
self.internal_relays = []
self.external_relays = []
self.last_internal_relay_index = 0
self.last_trusted_relay_index = 0
self.trusted_relays = []
self.untrusted_relays = []
self._parse_message()
self._hook_parsed_metadata()
def clear_matches(self):
"""Clear any already checked rules."""
self.rules_checked = dict()
self.score = 0
@staticmethod
def translate_line_breaks(text):
"""Convert any EOL style to Linux EOL."""
text = text.replace("\r\n", "\n")
return text.replace("\r", "\n")
@staticmethod
def normalize_html_part(payload):
"""Strip all HTML tags."""
data = list()
stripper = _ParseHTML(data)
try:
stripper.feed(payload)
except (UnicodeDecodeError, html.parser.HTMLParseError):
# We can't parse the HTML, so just strip it. This is still
# better than including generic HTML/CSS text.
pass
return data
@staticmethod
def _decode_header(header):
"""Decodes an email header and returns it as a string. Any parts of
the header that cannot be decoded are simply ignored.
"""
parts = list()
try:
decoded_header = email.header.decode_header(header)
except (ValueError, email.header.HeaderParseError):
return
for value, encoding in decoded_header:
if encoding:
try:
parts.append(value.decode(encoding, "ignore"))
except (LookupError, UnicodeError, AssertionError):
continue
else:
try:
parts.append(value.decode("utf-8", "ignore"))
except AttributeError:
parts.append(value)
return "".join(parts)
def get_raw_header(self, header_name):
"""Get a list of raw headers with this name."""
# This is just for consistencies, the raw headers should have been
# parsed together with the message.
return self.raw_headers.get(header_name, list())
def get_headers(self, header_name):
"""Get a list of headers which were added by plugins"""
return self.headers.get(header_name, list())
@_memoize("headers")
def get_decoded_header(self, header_name):
"""Get a list of decoded headers with this name."""
values = list()
for value in self.get_raw_header(header_name):
values.append(self._decode_header(value))
for value in self.get_headers(header_name):
values.append(value)
return values
def get_untrusted_ips(self):
"""Returns the untrusted IPs based on the users trusted
network settings.
:return: A list of `ipaddress.ip_address`.
"""
ips = [ip for ip in self.get_header_ips()
if ip not in self.ctxt.networks.trusted]
return ips
def get_header_ips(self):
values = list()
for header in self.received_headers:
values.append(ipaddress.ip_address(header["ip"]))
return values
@_memoize("addr_headers")
def get_addr_header(self, header_name):
"""Get a list of the first addresses from this header."""
values = list()
for value in self.get_decoded_header(header_name):
for dummy, addr in email.utils.getaddresses([value]):
if addr:
values.append(addr)
break
return values
def get_all_addr_header(self, header_name):
"""Get a list of all the addresses from this header."""
values = list()
for value in self.get_decoded_header(header_name):
for dummy, addr in email.utils.getaddresses([value]):
if addr:
values.append(addr)
return values
def get_all_from_headers_addr(self):
all_from_headers = ['From', 'Envelope-Sender',
'Resent-Sender', 'X-Envelope-From',
'EnvelopeFrom', 'Resent-From']
sender_addr = self.sender_address
for header in all_from_headers:
if header == 'EnvelopeFrom' and sender_addr:
yield sender_addr
else:
for addr in self.get_all_addr_header(header):
yield addr
@_memoize("name_headers")
def get_name_header(self, header_name):
"""Get a list of the first names from this header."""
values = list()
for value in self.get_decoded_header(header_name):
for name, dummy in email.utils.getaddresses([value]):
if name:
values.append(name)
break
return values
def get_raw_mime_header(self, header_name):
"""Get a list of raw MIME headers with this name."""
# This is just for consistencies, the raw headers should have been
# parsed together with the message.
return self.raw_mime_headers.get(header_name, list())
@_memoize("mime_headers")
def get_decoded_mime_header(self, header_name):
"""Get a list of raw MIME headers with this name."""
values = list()
for value in self.get_raw_mime_header(header_name):
values.append(self._decode_header(value))
return values
def iter_decoded_headers(self):
"""Iterate through all the decoded headers.
Yields strings like "<header_name>: <header_value>"
"""
for header_name in self.raw_headers:
for value in self.get_decoded_header(header_name):
yield "%s: %s" % (header_name, value)
def _create_plugin_tags(self, header):
for key, value in header.items():
self.plugin_tags[key.upper()] = value
def _parse_sender(self):
"""Extract the envelope sender from the message."""
always_trust_envelope_from = self.ctxt.conf[
'always_trust_envelope_sender']
headers = self.ctxt.conf["envelope_sender_header"] or DEFAULT_SENDERH
if self.external_relays:
sender = self.external_relays[0].get("envfrom")
if sender:
self.sender_address = sender.strip()
return
else:
if self.trusted_relays and not always_trust_envelope_from:
sender = self.trusted_relays[-1].get("envfrom")
if sender:
self.sender_address = sender.strip()
return
if self.untrusted_relays:
sender = self.untrusted_relays[0].get("envfrom")
if sender:
self.sender_address = sender.strip()
return
for sender_header in headers:
try:
sender = self.get_addr_header(sender_header)[0]
except IndexError:
continue
if sender:
self.sender_address = sender.strip()
self.ctxt.log.debug("Using %s as sender: %s",
sender_header, sender)
return
return
def _parse_relays(self, relays):
"""Walks though a relays list to extract
[un]trusted/internal/external relays"""
is_trusted = True
is_internal = True
found_msa = False
for position, relay in enumerate(relays):
relay['msa'] = 0
if relay['ip']:
ip = ipaddress.ip_address(str(relay['ip']))
in_internal = ip in self.ctxt.networks.internal
in_trusted = ip in self.ctxt.networks.trusted
in_msa = ip in self.ctxt.networks.msa
has_auth = relay.get("auth", None)
if is_trusted and not found_msa:
if self.ctxt.networks.configured:
if not in_trusted and not has_auth:
is_trusted = False
is_internal = False
else:
if is_internal and not has_auth and not in_internal:
is_internal = False
if in_msa:
relay['msa'] = 1
found_msa = True
elif not ip.is_private and not has_auth:
is_internal = False
is_trusted = False
relay['intl'] = int(is_internal)
if is_internal:
self.internal_relays.append(relay)
self.last_internal_relay_index = position
else:
self.external_relays.append(relay)
if is_trusted:
self.trusted_relays.append(relay)
self.last_trusted_relay_index = position
else:
self.untrusted_relays.append(relay)
tag_template = ("[ ip={ip} rdns={rdns} helo={helo} by={by} "
"ident={ident} envfrom={envfrom} intl={intl} id={id} auth={auth} "
"msa={msa} ]")
relays_tags = {
"RELAYSTRUSTED": " ".join([tag_template.format(**x)
for x in self.trusted_relays]),
"RELAYSUNTRUSTED": " ".join([tag_template.format(**x)
for x in self.untrusted_relays]),
"RELAYSINTERNAL": " ".join([tag_template.format(**x)
for x in self.internal_relays]),
"RELAYSEXTERNAL": " ".join([tag_template.format(**x)
for x in self.external_relays]),
}
if self.external_relays:
relays_tags.update({
"LASTEXTERNALIP": self.external_relays[-1]['ip'],
"LASTEXTERNALRDNS": self.external_relays[-1]['rdns'],
"LASTEXTERNALHELO": self.external_relays[-1]['helo']
})
self._create_plugin_tags(relays_tags)
def _parse_message(self):
"""Parse the message."""
self._hook_check_start()
# Dump the message raw headers
for line in self.raw_msg.splitlines():
if not email.feedparser.headerRE.match(line):
# If we saw the RFC defined header/body separator
# (i.e. newline), just throw it away. Otherwise the line is
# part of the body so push it back.
if line.strip():
self.missing_header_body_separator = True
break
for name, raw_value in self.msg._headers:
self.raw_headers[name].append(raw_value)
# XXX This is strange, but it's what SA does.
# The body starts with the Subject header(s)
body = list(self.get_decoded_header("Subject"))
raw_body = list()
for payload, part in self._iter_parts(self.msg):
if not part._headers:
self.missing_boundary_header = True
# Extract any MIME headers
for name, raw_value in part._headers:
self.raw_mime_headers[name].append(raw_value)
text = None
if payload is not None:
# this must be a text part
self.uri_list.update(set(URL_RE.findall(payload)))
if part.get_content_subtype() == "html":
text = self.normalize_html_part(payload.replace("\n", " "))
text = " ".join(text)
body.append(text)
raw_body.append(payload)
else:
text = payload.replace("\n", " ")
body.append(text)
raw_body.append(payload)
self._hook_extract_metadata(payload, text, part)
self.text = " ".join(body)
self.raw_text = "\n".join(raw_body)
received_headers = self.get_decoded_header("Received")
for header in self.ctxt.conf["originating_ip_headers"]:
headers = ["X-ORIGINATING-IP: %s" % x
for x in self.get_decoded_header(header)]
received_headers.extend(headers)
received_obj = ReceivedParser(received_headers)
self.received_headers = received_obj.received
self._parse_relays(self.received_headers)
self._parse_sender()
try:
self._create_plugin_tags(self.received_headers[0])
except IndexError:
pass
for header in self.received_headers:
self.hostname_with_ip.append((header["rdns"], header["ip"]))
@staticmethod
def _iter_parts(msg):
"""Extract and decode the text parts from the parsed email message.
For non-text parts the payload will be None.
Yields (payload, part)
"""
for part in msg.walk():
if part.get_content_maintype() == "text":
payload = part.get_payload(decode=True)
charset = part.get_content_charset()
errors = "ignore"
if not charset:
charset = "ascii"
elif charset.lower().replace("_", "-") in STRICT_CHARSETS:
errors = "strict"
try:
payload = payload.decode(charset, errors)
except (LookupError, UnicodeError, AssertionError):
try:
payload = payload.decode("ascii", "ignore")
except UnicodeError:
continue
yield payload, part
else:
yield None, part
def get_from_addresses(self):
"""Get addresses from 'Resent-From' header,
and if there are no addresses, get from
all FROM_HEADERS.
"""
addresses = self.get_all_addr_header('Resent-From')
if addresses:
for address in addresses:
yield address
else:
for key in FROM_HEADERS:
for address in self.get_all_addr_header(key):
yield address
def get_to_addresses(self):
"""Get addresses from 'Resent-To' and 'Resent-Cc'
headers, ad if there are no addresses, get from
all TO_HEADERS.
"""
addresses = self.get_all_addr_header('Resent-To')
addresses.extend(self.get_all_addr_header('Resent-Cc'))
if addresses:
for address in addresses:
yield address
else:
for key in TO_HEADERS:
for address in self.get_all_addr_header(key):
yield address
@property
def msgid(self):
"""Generate a unique ID for the message.
If the message already has an ID that should be unique, in the
Message-ID header, then simply use that. Otherwise, generate an
ID from the Date header and message content."""
# SA potentially produces multiple IDs, and checks them both.
# That seems an unnecessary complication, so just return the
# first one that we manage to generate.
msgid = self.msg[u"Message-ID"]
if msgid and not re.match(r"^\s*<\s*(?:\@sa_generated)?>.*$", msgid):
# Remove \r and < and > prefix / suffixes.
return msgid.strip().strip(u"<").strip(u">")
# Use the hexdigest of a SHA1 hash of (Date: and top N bytes of
# body), where N is min(1024 bytes, 1/2 of body length).
date = self.msg[u"Date"] or u"None"
body = self.msg.as_string().split("\n\n", 1)[1]
if len(body) > 64:
keep = 1024 if len(body) > 2048 else (len(body) // 2)
body = body[:keep]
# Strip all CR and LF so that testing midstream from MTA and
# post delivery don't generate different IDs simply because of
# LF<->CR<->CRLF changes.
body = body.replace("\n", "").replace("\r", "")
combined = "{date}\x00{body}".format(date=date, body=body)
msgid = u"%s@sa_generated" % hashlib.sha1(
combined.encode('utf-8')
).hexdigest()
return msgid
@property
def receive_date(self):
"""Get the date from the headers."""
received = self.msg.get_all("Received") or list()
for header in received:
try:
ts = header.rsplit(";", 1)[1]
except IndexError:
continue
ts = email.utils.parsedate(ts)
return calendar.timegm(ts)
# SA will look in other headers too. Perhaps we should also?
return time.time()
FROM_HEADERS = ('From', "Envelope-Sender", 'Resent-From', 'X-Envelope-From',
'EnvelopeFrom')
TO_HEADERS = ('To', 'Resent-To', 'Resent-Cc', 'Apparently-To', 'Delivered-To',
'Envelope-Recipients', 'Apparently-Resent-To', 'X-Envelope-To',
'Envelope-To',
'X-Delivered-To', 'X-Original-To', 'X-Rcpt-To', 'X-Real-To',
'Cc')