SpamExperts/OrangeAssassin

View on GitHub
oa/rules/parser.py

Summary

Maintainability
D
1 day
Test Coverage
"""Parse OA rule sets.

The general syntax for OA rules is (on one line):

<type> <name> <value>

Various options can be defined for a rule and they get bundled up using
the name as unique identifier.
"""

from __future__ import absolute_import

from builtins import dict
from builtins import object

import re
import os
import warnings
import contextlib
import collections
import locale

import oa.config
import oa.errors
import oa.context
import oa.plugins
import oa.rules.uri
import oa.rules.body
import oa.rules.meta
import oa.rules.full
import oa.rules.eval_
import oa.rules.header
import oa.rules.ruleset

from oa.regex import Regex

# Simple protection against recursion with "include".
MAX_RECURSION = 10

# Rules that require 2 arguments
KNOWN_2_RTYPE = frozenset(
    (
        "score",  # Specifies the score adjustment if the rule matches
        "priority",  # Specifies the priority of the rule
        "describe",  # Specifies a comment describing the rule
        "full",  # Specifies a FullRule
        "body",  # Specifies a BodyRule
        "rawbody",  # Specifies a RawBodyRule
        "uri",  # Specifies a URIRule
        "header",  # Specifies a HeaderRule
        "mimeheader",  # Specifies a MimeHeaderRule
        "meta",  # Specifies a MetaRule
        "eval",  # Specifies a EvalRule
        "lang",  # Specifies a language
        "tflags",  # Specifies a TflagRule
    )
)
# Rules that require 1 arguments
KNOWN_1_RTYPE = frozenset(
    (
        "report",  # Add some text to the report template
        "unsafe_report",  # Add some text to the unsafe report template
        "add_header",  # Adds a header to the message
        "remove_header",  # Remove header from message
        "include",  # Include another file in the current one
        "ifplugin",  # Check if plugin is loaded.
        "loadplugin",  # Load a plugin.
        "require_version",  # Only load this file if the version matches
        "required_score",  # Set the required score for this ruleset
        "report_safe",  # Set the method of reporting spam
        "report_contact",  # Set the contact address
        "required_score",  # Set the required score (default 5)
    )
)

# These are the types of rules that we know how to interpret, ignore anything
# else. These include rule types and option types.
KNOWN_RTYPE = KNOWN_1_RTYPE | KNOWN_2_RTYPE

# These types define rules and not options. Map against their specific class.
RULES = {
    "full": oa.rules.full.FullRule,
    "body": oa.rules.body.BodyRule,
    "rawbody": oa.rules.body.RawBodyRule,
    "uri": oa.rules.uri.URIRule,
    "meta": oa.rules.meta.MetaRule,
    "header": oa.rules.header.HeaderRule,
    "mimeheader": oa.rules.header.MimeHeaderRule,
    "eval": oa.rules.eval_.EvalRule,
}

_COMMENT_P = Regex(r"((?<=[^\\])#.*)")


class PADParser(object):
    """Parses PAD ruleset and extracts and combines the relevant data.

    Note that this is not thread-safe.
    """

    def __init__(self, paranoid=False, ignore_unknown=True, lazy_mode=True):
        self.ctxt = oa.context.GlobalContext(paranoid=paranoid,
                                             ignore_unknown=ignore_unknown,
                                             lazy_mode=lazy_mode)
        # XXX This could be a default OrderedDict
        self.results = collections.OrderedDict()
        self.ruleset = oa.rules.ruleset.RuleSet(self.ctxt)
        self._ignore = False

    @contextlib.contextmanager
    def _paranoid(self, *exceptions):
        """If not paranoid ignore the specified exceptions."""
        try:
            yield
        except exceptions as e:
            self.ctxt.err(e)
            if self.ctxt.paranoid:
                raise

    def parse_file(self, filename, _depth=0):
        """Parses a single PAD ruleset file."""
        if _depth > MAX_RECURSION:
            raise oa.errors.MaxRecursionDepthExceeded()
        self.ctxt.log.debug("Parsing file: %s", filename)
        if not os.path.isfile(filename):
            self.ctxt.log.warn("Ignoring %s, not a file", filename)
            return
        with open(filename, "rb") as rulef:
            for line_no, line in enumerate(rulef):
                try:
                    with self._paranoid(oa.errors.InvalidSyntax):
                        self._handle_line(filename, line, line_no + 1, _depth)
                except oa.errors.PluginLoadError as e:
                    warnings.warn(str(e))
                    self.ctxt.log.warn("%s", e)

    def _handle_line(self, filename, line, line_no, _depth=0):
        """Handles a single line."""
        try:
            line = line.decode("iso-8859-1").strip()
        except UnicodeDecodeError as e:
            raise oa.errors.InvalidSyntax(filename, line_no, line,
                                          "Decoding Error: %s" % e)
        # if line.startswith("if can"):
        #     # XXX We don't support for this check, simply
        #     # XXX skip everything for now.
        #     self._ignore = True
        #     return

        if line.startswith("endif"):
            self._ignore = False
            return

        if line.startswith("else"):
            if self._ignore:
                self._ignore = False
            else:
                self._ignore = True
            return

        if line.startswith("require_version"):
            # XXX We don't really have any use for this now
            # XXX Just skip it.
            return

        if not line or line.startswith("#") or self._ignore:
            return

        # Remove any comments
        line = _COMMENT_P.sub("", line).strip()

        try:
            rtype, value = line.split(None, 1)
        except ValueError:
            # Some plugin might know how to handle this line
            rtype, value = line, ""

        if rtype == "include":
            dirname = None
            if os.path.isabs(filename):
                dirname = os.path.dirname(filename)
            self._handle_include(value, line, line_no, _depth,
                                 dirname=dirname)
        elif rtype == "ifplugin":
            self._handle_ifplugin(value)
        elif rtype == "loadplugin":
            self._handle_loadplugin(value)
        elif rtype in KNOWN_2_RTYPE or rtype in self.ctxt.cmds:
            try:
                rtype, name, value = line.split(None, 2)
            except ValueError:
                raise oa.errors.InvalidSyntax(filename, line_no, line,
                                              "Missing argument")
            if rtype == "tflags":
                value = value.split()

            if rtype == "lang":
                locale.setlocale(locale.LC_ALL, '')
                locale_language = locale.getlocale(locale.LC_MESSAGES)[0]
                if not locale_language.startswith(name):
                    self.ctxt.log.debug("Lang argument does not"
                                        "correspond with locales")
                    return

                if "report" in value:
                    try:
                        rtype, value = value.split(None, 1)
                    except ValueError:
                        raise oa.errors.InvalidSyntax(filename, line_no, line,
                                                      "Missing argument")

                    if not self.ctxt.hook_parse_config(rtype, value):
                        self.ctxt.err("%s:%s Ignoring unknown"
                                      "configuration line: %s",
                                      filename, line_no, line)
                    return

                try:
                    rtype, name, value = value.split(None, 2)
                except ValueError:
                    raise oa.errors.InvalidSyntax(filename, line_no, line,
                                                  "Missing argument")

            if name not in self.results:
                self.results[name] = dict()

            if rtype in RULES or rtype in self.ctxt.cmds:
                if value.startswith("eval:"):
                    # This is for compatibility with SA ruleset
                    self.results[name]["target"] = rtype
                    rtype = "eval"
                self.results[name]["type"] = rtype
                self.results[name]["value"] = value
            else:
                if rtype == 'priority':
                    try:
                        int(value)
                    except ValueError:
                        self.ctxt.err("%s:%s Invalid type for priority value "
                                      "in configuration line: %s, setting it "
                                      "by"
                                      " default to 0", filename, line_no, line)
                self.results[name][rtype] = value

        else:
            if not self.ctxt.hook_parse_config(rtype, value):
                self.ctxt.err("%s:%s Ignoring unknown configuration line: %s",
                              filename, line_no, line)

    def _handle_include(self, value, line, line_no, _depth=0,
                        dirname=None):
        """Handles the 'include' keyword."""
        filename = value.strip()
        if not os.path.isabs(filename) and dirname is not None:
            filename = os.path.join(dirname, filename)
        try:
            self.parse_file(filename, _depth=_depth + 1)
        except oa.errors.MaxRecursionDepthExceeded as e:
            e.add_call(filename, line_no, line)
            raise e

    def _handle_ifplugin(self, value):
        """Handles the 'ifplugin' keyword."""
        plugin_name = oa.plugins.REIMPLEMENTED_PLUGINS.get(value, value)
        try:
            plugin_name = plugin_name.rsplit(".", 1)[1]
        except IndexError:
            pass
        if plugin_name not in self.ctxt.plugins:
            self.ctxt.log.debug("Plugin %s not loaded, skipping.", plugin_name)
            self._ignore = True

    def _handle_loadplugin(self, value):
        """Handles the 'loadplugin' keyword."""
        try:
            plugin_name, path = value.split(None, 1)
        except ValueError:
            plugin_name, path = value, None
        if "::" in plugin_name:
            plugin_name = oa.plugins.REIMPLEMENTED_PLUGINS.get(plugin_name)
        if plugin_name:
            self.ctxt.load_plugin(plugin_name, path)
        else:
            self.ctxt.log.warn("Plugin not available: %s", value)

    def get_ruleset(self):
        """Create and return the corresponding ruleset for the parsed files."""
        self.ctxt.hook_parsing_start(self.results)
        for name, data in self.results.items():
            try:
                rule_type = data["type"]
            except KeyError:
                e = oa.errors.InvalidRule(name, "No rule type defined.")
                self.ctxt.err(e)
                if self.ctxt.paranoid:
                    raise e
            else:
                with self._paranoid(oa.errors.InvalidRule,
                                    oa.errors.InvalidRegex):
                    try:
                        rule_class = RULES[rule_type]
                    except KeyError:
                        # A plugin might have been loaded that
                        # can handle this.
                        rule_class = self.ctxt.cmds[rule_type]
                    self.ctxt.log.debug("Adding rule %s with: %s", name, data)
                    rule = rule_class.get_rule(name, data)
                    self.ruleset.add_rule(rule)
        self.ctxt.hook_parsing_end(self.ruleset)
        self.ctxt.log.info("%s rules loaded", len(self.ruleset.checked))
        self.ruleset.post_parsing()
        return self.ruleset


def parse_pad_rules(files, paranoid=False, ignore_unknown=True):
    """Parse a list of PAD rules and returns the corresponding ruleset.

    'files' - a list of file paths.

    Returns a dictionary that maps rule names to a dictionary of rule options.
    Every rule will contain "type" and "value" which corresponds to the
    mandatory line for defining a rule. For example (type, name value):

    body LOCAL_DEMONSTRATION_RULE   /test/

    Other options may be included such as "score", "describe".
    """
    parser = PADParser(paranoid=paranoid, ignore_unknown=ignore_unknown)
    for filename in files:
        parser.parse_file(filename)

    return parser