dnstats/dnstatsio

View on GitHub
dnstats/dnsvalidate/dmarc.py

Summary

Maintainability
D
1 day
Test Coverage
from enum import Enum

from dnstats.grading import update_count_dict


class DmarcErrors(Enum):
    INVALID_DMARC_RECORD = 0
    INVALID_ADKIM_VALUE = 1
    INVALID_ASPF_VALUE = 2
    INVALID_FAILURE_REPORTING_VALUE = 3
    INVALID_POLICY = 4
    INVALID_SUBDOMAIN_POLICY = 5
    MULTIPLE_DMARC_RECORDS = 6
    INVALID_RF_VALUE = 7
    INVALID_RI_VALUE = 8
    INVALID_PCT_VALUE = 9
    INVALID_DMARC_RECORD_START = 10
    NO_DMARC_RECORD = 11


class Dmarc:
    """"
    Validates and represents a DMARC record
    """
    def __init__(self, dmarc_result_set):
        self.dmarc_result_set = dmarc_result_set
        self.result = validate(self.dmarc_result_set)

    @property
    def is_valid(self) -> bool:
        return self.result == 0

    @property
    def errors(self) -> list:
        if self.result['errors']:
            return self.result['errors']
        else:
            return list()

    @property
    def adkim(self) -> str:
        return self.result.get('adkim')

    @property
    def aspf(self) -> str:
        return self.result.get('aspf')

    @property
    def fo(self) -> str:
        return self.result.get('fo')

    @property
    def p(self) -> str:
        """
        Policy of the DMARC policy
        :return:
        """
        return self.result.get('p')

    @property
    def pct(self) -> int:
        value = self.result.get('pct')
        if not value and value != 0:
            return 100
        else:
            return value

    @property
    def rua(self) -> str:
        return self.result.get('rua')

    @property
    def ruf(self) -> str:
        return self.result.get('ruf')

    @property
    def sp(self) -> str:
        return self.result.get('sp')


def validate(dmarc_result_set: list) -> dict:
    dmarc_record_values = dict()
    tag_count = dict()
    errors = list()
    if len(dmarc_result_set) < 1:
        errors.append(DmarcErrors.NO_DMARC_RECORD)
        dmarc_record_values['errors'] = errors
        return dmarc_record_values

    if len(dmarc_result_set) > 1:
        errors.append(DmarcErrors.MULTIPLE_DMARC_RECORDS)
        dmarc_record_values['errors'] = errors
        return dmarc_record_values
    dmarc_record = dmarc_result_set[0]
    has_rua = False
    has_ruf = False
    if dmarc_record and dmarc_record.startswith('v=DMARC1;'):
        parts = dmarc_record.split(';')
        for part in parts:
            sub_parts = part.split('=')
            if len(sub_parts) != 2:
                continue
            tag = sub_parts[0].strip()
            value = sub_parts[1].strip()
            update_count_dict(tag_count, tag)
            # TODO: verify that parts are using valid chars
            if tag == 'adkim':
                dmarc_record_values['adkim'] = value
                if value not in ['s', 'r']:
                    errors.append(DmarcErrors.INVALID_ADKIM_VALUE)
            elif tag == 'aspf':
                dmarc_record_values['aspf'] = value
                if value not in ['s', 'r']:
                    errors.append(DmarcErrors.INVALID_ASPF_VALUE)
            elif tag == 'fo':
                dmarc_record_values['fo'] = value
                if value not in ['1', 'd', 's', '0']:
                    errors.append(DmarcErrors.INVALID_FAILURE_REPORTING_VALUE)
            elif tag == 'p':
                dmarc_record_values['p'] = value
                if value not in ['none', 'quarantine', 'reject']:
                    errors.append(DmarcErrors.INVALID_POLICY)
            elif tag == 'pct':
                pct_value = 100
                try:
                    pct_value = int(value)
                except ValueError:
                    pass
                if 0 < pct_value < 100:
                    errors.append(DmarcErrors.INVALID_PCT_VALUE)
                dmarc_record_values['pct'] = pct_value
            elif tag == 'rf':
                dmarc_record_values['rf'] = value
                values = value.split(';')
                for rf in values:
                    if rf != 'afrf':
                        errors.append(DmarcErrors.INVALID_RF_VALUE)
            elif tag == 'ri':
                try:
                    ri = int(value)
                except ValueError:
                    errors.append(DmarcErrors.INVALID_RI_VALUE)
                    continue
                if ri < 0 or ri > 4294967295:
                    errors.append(DmarcErrors.INVALID_RI_VALUE)

                dmarc_record_values['ri'] = ri

            elif tag == 'rua':
                dmarc_record_values['rua'] = value
                if not has_rua:
                    has_rua = True
            elif tag == 'ruf':
                dmarc_record_values['ruf'] = value
                if not has_ruf:
                    has_ruf = True
            elif tag == 'sp':
                dmarc_record_values['sp'] = value
                if value not in ['none', 'quarantine', 'reject']:
                    errors.append(DmarcErrors.INVALID_SUBDOMAIN_POLICY)
            elif tag == 'v':
                dmarc_record_values['v'] = value
            else:
                dmarc_record_values[tag] = value
    else:
        errors.append(DmarcErrors.INVALID_DMARC_RECORD_START)
        dmarc_record_values['errors'] = errors
        return dmarc_record_values

    dmarc_record_values['errors'] = errors
    return dmarc_record_values