oa/plugins/wlbl_eval.py
""" WLBLEval plugin."""
from __future__ import absolute_import
from builtins import str
import re
from collections import defaultdict
import ipaddress
import oa.plugins.base
from oa.regex import Regex
from oa.networks import _format_network_str
#TL_TLDS = ['com', 'co.uk', 'multi.surbl.org']
class WLBLEvalPlugin(oa.plugins.base.BasePlugin):
eval_rules = ("check_from_in_whitelist", "check_to_in_whitelist",
"check_from_in_blacklist", "check_to_in_blacklist",
"check_from_in_list", "check_to_in_all_spam",
"check_to_in_list", "check_mailfrom_matches_rcvd",
"check_from_in_default_whitelist",
"check_forged_in_whitelist",
"check_to_in_more_spam", "check_forged_in_default_whitelist",
"check_uri_host_listed", "check_uri_host_in_whitelist",
"check_uri_host_in_blacklist"
)
options = {
"blacklist_from": ("append_split", []),
"whitelist_from": ("append_split", []),
"whitelist_to": ("append_split", []),
"blacklist_to": ("append_split", []),
"all_spam_to": ("append_split", []),
"more_spam_to": ("append_split", []),
"def_whitelist_from_rcvd": ("list", []),
"whitelist_from_rcvd": ("list", []),
"whitelist_allow_relays": ("append_split", []),
"enlist_uri_host": ("list", []),
"delist_uri_host": ("list", []),
"blacklist_uri_host": ("list", []),
"whitelist_uri_host": ("list", []),
"util_rb_tld": ("append_split", []),
"util_rb_2tld": ("append_split", []),
"util_rb_3tld": ("append_split", [])
}
parsed_lists = {
"parsed_blacklist_from": ("list", []),
"parsed_whitelist_from": ("list", []),
"parsed_whitelist_to": ("list", []),
"parsed_blacklist_to": ("list", []),
"parsed_all_spam_to": ("list", []),
"parsed_more_spam_to": ("list", []),
"parsed_def_whitelist_from_rcvd": ("dict", {}),
"parsed_whitelist_from_rcvd": ("dict", {}),
"parsed_whitelist_allow_relays": ("list", []),
"parsed_enlist_uri_host": ("dict", {}),
"parsed_delist_uri_host": ("dict", {}),
"parsed_whitelist_uri_host": ("dict", {}),
"parsed_blacklist_uri_host": ("dict", {})
}
def check_start(self, msg):
"""Parses all the required white and blacklists. Stores
the results in the the "parsed" versions.
"""
self.set_local(msg, "from_in_whitelist", 0)
self.set_local(msg, "from_in_default_whitelist", 0)
self['parsed_whitelist_from'] = self.parse_list('whitelist_from')
self['parsed_whitelist_to'] = self.parse_list('whitelist_to')
self['parsed_blacklist_from'] = self.parse_list('blacklist_from')
self['parsed_blacklist_to'] = self.parse_list('blacklist_to')
self['parsed_all_spam_to'] = self.parse_list('all_spam_to')
self['parsed_more_spam_to'] = self.parse_list('more_spam_to')
self['parsed_def_whitelist_from_rcvd'] = self.parse_input(
'def_whitelist_from_rcvd')
self['parsed_whitelist_from_rcvd'] = self.parse_input(
'whitelist_from_rcvd')
self['parsed_whitelist_allow_relays'] = self.parse_list(
'whitelist_allow_relays')
self["parsed_delist_uri_host"] = self.parse_delist_uri()
self['parsed_whitelist_uri_host'] = self.parse_wlbl_uri(
'whitelist_uri_host')
self['parsed_blacklist_uri_host'] = self.parse_wlbl_uri(
'blacklist_uri_host')
self['parsed_enlist_uri_host'] = self.parse_list_uri('enlist_uri_host')
def check_input(self, address):
characters = ["?", "@", ".", "*@"]
return len([e for e in characters if e in address])
def escape_addr(self, address):
return re.escape(address).replace(r"\*", ".*").replace(r"\?", ".?")
def parse_input(self, list_name):
"""Parse the list into a dictionary with the regex as key and the
domain as value.
"""
parsed_list = defaultdict(list)
for addr in self[list_name]:
line = addr.split(None, 1)
if len(line) == 2:
if self.check_input(line[0]):
for dom in line[1].split():
parsed_list[self.escape_addr(line[0])].append(dom)
return parsed_list
def parse_list(self, list_name):
parsed_list = []
characters = ["?", "@", ".", "*@"]
for addr in self[list_name]:
if len([e for e in characters if e in addr]):
address = re.escape(addr).replace(r"\*", ".*").replace(r"\?", ".?")
if "@" in address:
parsed_list.append(address)
else:
parsed_list.append(".*@"+address)
return parsed_list
def my_list(self):
return {"in_list": [], "not_in_list": []}
def parse_delist_uri(self):
"""Parse 'delist_uri_host'. If there is no list name,
then apply for all lists.
"""
parsed_list = defaultdict(list)
for x in self['delist_uri_host']:
uri_host_list = x.split()
if "(" in x:
key = uri_host_list[0].strip("( ").rstrip(" )")
for uri in uri_host_list[1:]:
parsed_list[key].append(uri.strip("."))
else:
for uri in uri_host_list:
parsed_list['ALL'].append(uri.strip("!"))
return parsed_list
def add_in_list(self, key, item, parsed_list):
"""Add elements in parsed list
"""
if item.startswith("!"):
parsed_list[key]["not_in_list"].append(item.strip("!"))
else:
# parsed_list[key]["in_list"].append("." + item)
parsed_list[key]["in_list"].append(item)
def add_in_dict(self, list_name, key, parsed_list):
"""Add elements in the parsed list dictionary and ignore
the ones that are in the 'delist_uri_host'
"""
delist = self['parsed_delist_uri_host']
for item in list_name:
if item not in delist[key] and item not in delist['ALL']:
self.add_in_list(key, item, parsed_list)
def parse_wlbl_uri(self, list_name):
"""Parse witleist_uri_host and blacklist_uri_host
"""
parsed_list = set()
for x in self[list_name]:
parsed_list.update(x.split())
return parsed_list
def parse_list_uri(self, list_name):
"""Parse the list into a dictionary with the list_name as key and a
dictonary as value (in order to know which domains to ignore or not).
Add the domains from "whitelist_uri_host" and "blacklist_uri_host"
from config file
"""
parsed_list = defaultdict(self.my_list)
for x in self[list_name]:
uri_host_list = x.split()
key = uri_host_list[0].strip("( ").rstrip(" )")
self.add_in_dict(uri_host_list[1:], key, parsed_list)
self.add_in_dict(self['parsed_whitelist_uri_host'], 'WHITE',
parsed_list)
self.add_in_dict(self['parsed_blacklist_uri_host'], 'BLACK',
parsed_list)
return parsed_list
def check_in_list(self, msg, addresses, list_name):
"""Check if addresses match the regexes from list_name and modify
"from_in_whitelist" msg value based on the list name
"""
param = "from_in_whitelist"
for address in addresses:
for regex in self[list_name]:
if Regex(regex).search(address):
self.set_local(msg, param, 1)
return True
wh = self.check_whitelist_rcvd(msg, "parsed_whitelist_from_rcvd",
address)
if wh == 1:
self.set_local(msg, param, 1)
return True
elif wh == -1:
self.set_local(msg, param, -1)
return False
def check_address_in_list(self, addresses, list_name):
"""Check if addresses match the regexes from list_name.
"""
for address in addresses:
for regex in self[list_name]:
if Regex(regex).search(address):
return True
return False
def check_in_default_whitelist(self, msg, addresses, list_name):
"""Check if addresses match the regexes from list_name and modify
"from_in_default_whitelist" msg value based on the list name
"""
param = "from_in_default_whitelist"
found_match = 0
for address in addresses:
wh = self.check_whitelist_rcvd(msg, list_name, address)
if wh == 1:
self.set_local(msg, param, 1)
return True
elif wh == -1:
found_match = -1
self.set_local(msg, param, found_match)
return False
def check_in_TL_TLDS(self, address):
if address in self["util_rb_tld"]:
return True
if address in self["util_rb_2tld"]:
return True
if address in self["util_rb_3tld"]:
return True
return False
def base_domain(self, address):
""" Parse the address in order to extract the domain and the TLD
"""
domain = address
parts = domain.split('.')
if len(parts) < 3:
return ".".join(parts)
if len([p for p in parts if not p.isdigit()]) == 0:
return ".".join(parts[::-1])
if self.check_in_TL_TLDS(".".join(parts[-3:])):
return ".".join(parts[-4:])
if self.check_in_TL_TLDS(".".join(parts[-2:])):
return ".".join(parts[-3:])
return ".".join(parts[-2:])
def check_from_in_whitelist(self, msg, target=None):
"""Get all the from addresses with get_from_addresses and
check if they match the whitelist regexes.
"""
return self._check_whitelist(msg, "from_in_whitelist")
def _check_whitelist(self, msg, check_name):
"""Check addresses from "default whitelist"/"whitelist" in
"parsed_whitelist_from"
"""
addresses = msg.get_from_addresses()
if self.get_local(msg, check_name) == 0:
if check_name == "from_in_whitelist":
list_name = 'parsed_whitelist_from'
self.check_in_list(msg, addresses, list_name)
else:
list_name = 'parsed_def_whitelist_from_rcvd'
self.check_in_default_whitelist(msg, addresses, list_name)
return self.get_local(msg, check_name) > 0
def check_to_in_whitelist(self, msg, target=None):
"""Get all the to addresses with get_to_addresses and
check if they match the whitelist regexes.
"""
return self.check_address_in_list(msg.get_to_addresses(),
'parsed_whitelist_to')
def check_from_in_blacklist(self, msg, target=None):
"""Get all the from addresses and
check if they match the blacklist regexes.
"""
return self.check_address_in_list(msg.get_from_addresses(),
'parsed_blacklist_from')
def check_to_in_blacklist(self, msg, target=None):
"""Get all the from addresses and
check if they match the blacklist regexes.
"""
return self.check_address_in_list(msg.get_to_addresses(),
'parsed_blacklist_to')
def check_from_in_list(self, msg, list_name, target=None):
"""Get all the from addresses with and
check if they match the given list regexes.
"""
if not list_name:
return False
parsed_list_name = "parsed_%s" % list_name
return self.check_address_in_list(msg.get_from_addresses(),
parsed_list_name)
def check_to_in_list(self, msg, list_name, target=None):
"""Get all the to addresses and check if they match
the given list regexes.
"""
parsed_list_name = "parsed_%s" % list_name
return self.check_address_in_list(msg.get_to_addresses(),
parsed_list_name)
def check_to_in_all_spam(self, msg, target=None):
"""Get all the to addresses and check if they match
the 'all_spam_to' regexes.
"""
return self.check_address_in_list(msg.get_to_addresses(),
'parsed_all_spam_to')
def check_to_in_more_spam(self, msg, target=None):
"""Get all the to addresses and check if they match
the 'more_spam_to' regexes.
"""
return self.check_address_in_list(msg.get_to_addresses(),
'parsed_more_spam_to')
def check_from_in_default_whitelist(self, msg, target=None):
return self._check_whitelist(msg, "from_in_default_whitelist")
def check_mailfrom_matches_rcvd(self, msg, target=None):
""" If there is an EnvelopeFrom address, get it's domain.
If there are untrusted relays, get the first one,
else if there are trusted relays get them all.
For each non empty relay rdns verify if the last part
of the domain matches the last part of the rdns.
"""
address = msg.sender_address
relays = []
if address:
domain = self.base_domain(address.split("@")[1])
else:
return False
if len(msg.untrusted_relays) > 0:
relays.append(msg.untrusted_relays[0])
elif len(msg.trusted_relays) > 0:
relays.extend(msg.trusted_relays)
else:
return False
for relay in relays:
relay_domain = relay["rdns"]
if relay_domain.endswith(domain):
return True
return False
def check_forged_in_whitelist(self, msg, target=None):
"""First does a 'check_from_in_whitelist' and then
'check_from_in_default_whitelist' and return the state of
the msg values: "from_in_whitelist" and "from_in_default_whitelist".
"""
self.check_from_in_whitelist(msg)
self.check_from_in_default_whitelist(msg)
checked_w = (self.get_local(msg, "from_in_whitelist") < 0)
checked_dw = (self.get_local(msg, "from_in_default_whitelist") == 0)
return checked_w and checked_dw
def check_forged_in_default_whitelist(self, msg, target=None):
"""First does a 'check_from_in_whitelist' and then
'check_from_in_default_whitelist' and return the state of
the msg values: "from_in_whitelist" and "from_in_default_whitelist".
"""
self.check_from_in_whitelist(msg)
self.check_from_in_default_whitelist(msg)
checked_w = (self.get_local(msg, "from_in_whitelist") == 0)
checked_dw = (self.get_local(msg, "from_in_default_whitelist") < 0)
return checked_w and checked_dw
def check_whitelist_rcvd(self, msg, list_name, address):
"""Look up address and trusted relays in a whitelist with rcvd
"""
if len(msg.untrusted_relays) + len(msg.trusted_relays) == 0:
return 0
relays = []
if len(msg.untrusted_relays) > 0:
relays.append(msg.untrusted_relays[0])
elif len(msg.trusted_relays) > 0:
relays.extend(msg.trusted_relays)
address = address.lower()
found_forged = 0
for white_addr in self[list_name]:
regexp = white_addr.replace("*", ".*")
for domain in self[list_name][white_addr]:
if Regex(regexp).search(address):
match = self.check_rcvd(domain, relays)
if match == 1:
return 1
found_forged = -1
found_forged = self.check_found_forged(address, found_forged)
return found_forged
def check_rcvd(self, domain, relays):
"""Check if it is a match by IP address or is a subnet.
If is not a valid IP address, try to match by rdns
"""
match = -1
for relay in relays:
wl_ip = domain.strip("[ ").rstrip(" ]")
try:
swl_ip =str(wl_ip)
network = ipaddress.ip_network(_format_network_str(swl_ip,
None))
if ipaddress.ip_address(str(relay['ip'])) in network:
match = 1
break
except ValueError:
rdns = relay['rdns'].lower()
if wl_ip == rdns or rdns.endswith(".%s" % wl_ip):
match = 1
break
return match
def check_found_forged(self, address, found_forged):
"""If it is forged, check the address in list """
if found_forged:
wlist = self['parsed_whitelist_allow_relays']
for addr in wlist:
if Regex(addr).search(address):
found_forged = 0
break
return found_forged
def check_uri_host_listed(self, msg, list_name, target=None):
"""Check if the message has URIs that are listed
in the specified hostname
"""
parsed_list = 'parsed_enlist_uri_host'
for uri in msg.uri_list:
if uri in self[parsed_list][list_name]['not_in_list']:
continue
for _uri_list_name in self[parsed_list][list_name]['in_list']:
if uri.endswith(_uri_list_name):
return True
return False
def check_uri_host_in_whitelist(self, msg, target=None):
"""Shorthand for check_uri_host_listed('WHITE')
"""
return self.check_uri_host_listed(msg, 'WHITE', None)
def check_uri_host_in_blacklist(self, msg, target=None):
"""Shorthand for check_uri_host_listed('BLACK')
"""
return self.check_uri_host_listed(msg, 'BLACK', None)