SpamExperts/OrangeAssassin

View on GitHub
oa/networks.py

Summary

Maintainability
A
2 hrs
Test Coverage
import re
import logging
import ipaddress

from builtins import str
from builtins import object
from oa.regex import Regex

_NETWORK_RE = Regex(r"""
^(?P<exclude>!?)
    \[?
        (?P<ip>
            [0-9.]+|           # IPv4
            [0-9a-f:]*         # IPv6
        )
    \]?

    /?(?P<port>[0-9]{0,5})$
""", re.I | re.S | re.M | re.X)


def _format_network_str(network, mask):
    padding = ""
    if not network.endswith("."):
        length = len(network.split("."))
        if length == 3 or length == 7:
            if "/" not in network:
                network = network + "."
    if network.endswith("."):
        padding = ".".join(["0"] * (4 - network.count(".")))
        if not mask:
            mask = network.count(".") * 8
    if mask:
        return str("%s%s/%s" % (network, padding, mask))
    return str("%s%s" % (network, padding))

class NetworkListBase(object):
    _always_accepted = ()
    configured = False

    def __init__(self):
        self._networks = []
        self._networks.extend(self._always_accepted)

    def add(self, network, accepted):
        self._networks.append((network, accepted))

    def clear(self):
        self._networks = []
        self._networks.extend(self._always_accepted)

    def __contains__(self, query):
        for network, accepted in self._networks:
            if query in network:
                return accepted
        return False


class TrustedNetworks(NetworkListBase):
    _always_accepted = (
        (ipaddress.ip_network(str("127.0.0.0/8")), True),
        (ipaddress.ip_network(str("::1")), True),
    )


class InternalNetworks(TrustedNetworks):
    pass


class MSANetworks(NetworkListBase):
    pass


class NetworkList(object):
    internal = InternalNetworks()
    trusted = TrustedNetworks()
    msa = MSANetworks()

    def __init__(self):
        self.log = logging.getLogger("oa-logger")

    @property
    def configured(self):
        return self.internal.configured or self.trusted.configured

    def _extract_network(self, network_str):
        excluded, network, mask = _NETWORK_RE.match(network_str).groups()
        clean_value = _format_network_str(network, mask)
        try:
            network = ipaddress.ip_network(clean_value)
        except ValueError:
            return excluded, None
        return excluded, network

    def add_trusted_network(self, network_str):
        excluded, network = self._extract_network(network_str)
        self.trusted.configured = True
        self.trusted.add(network, not excluded)
        if not self.internal.configured:
            self.internal.add(network, not excluded)

    def add_internal_network(self, network_str):
        excluded, network = self._extract_network(network_str)
        if not self.internal.configured and self.internal:
            self.internal.clear()
        self.internal.configured = True
        self.trusted.add(network, not excluded)
        self.internal.add(network, not excluded)

    def add_msa_network(self, network_str):
        excluded, network = self._extract_network(network_str)
        self.internal.configured = True
        self.msa.add(network, not excluded)