tlsfuzzer/tlslite-ng

View on GitHub
tlslite/ocsp.py

Summary

Maintainability
A
40 mins
Test Coverage
A
94%
"""Class for handling primary OCSP responses"""

from .utils.asn1parser import ASN1Parser
from .utils.cryptomath import bytesToNumber, numBytes, secureHash
from .x509 import X509
from .signed import SignedObject
from .errors import TLSIllegalParameterException

class OCSPRespStatus(object):
    """ OCSP response status codes (RFC 2560) """
    successful = 0
    malformedRequest = 1
    internalError = 2
    tryLater = 3    # 4 is not used to match RFC2560 specification
    sigRequired = 5
    unauthorized = 6


class CertStatus(object):
    """ Certificate status in an OCSP response """
    good, revoked, unknown = range(3)


class SingleResponse(object):
    """ This class represents SingleResponse ASN1 type (defined in RFC2560) """
    def __init__(self, value):
        self.value = value
        self.cert_hash_alg = None
        self.cert_issuer_name_hash = None
        self.cert_issuer_key_hash = None
        self.cert_serial_num = None
        self.cert_status = None
        self.this_update = None
        self.next_update = None
        self.parse(value)

    _hash_algs_OIDs = {
        tuple([0x2a, 0x86, 0x48, 0x86, 0xf7, 0xd, 0x2, 0x5]): 'md5',
        tuple([0x2b, 0xe, 0x3, 0x2, 0x1a]): 'sha1',
        tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x4]): 'sha224',
        tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x1]): 'sha256',
        tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x2]): 'sha384',
        tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x3]): 'sha512'
    }

    def parse(self, value):
        cert_id = value.getChild(0)
        self.cert_hash_alg = cert_id.getChild(0).getChild(0).value
        self.cert_issuer_name_hash = cert_id.getChild(1).value
        self.cert_issuer_key_hash = cert_id.getChild(2).value
        self.cert_serial_num = bytesToNumber(cert_id.getChild(3).value)
        self.cert_status = value.getChild(1).value
        self.this_update = value.getChild(2).value
        # next_update is optional
        try:
            fld = value.getChild(3)
            if fld.type.tag_id == 0:
                self.next_update = fld.value
        except SyntaxError:
            self.next_update = None

    def verify_cert_match(self, server_cert, issuer_cert):
        # extact subject public key
        issuer_key = issuer_cert.subject_public_key

        # extract issuer DN
        issuer_name = issuer_cert.subject

        try:
            alg = self._hash_algs_OIDs[tuple(self.cert_hash_alg)]
        except KeyError as e:
            raise TLSIllegalParameterException("Unknown hash algorithm: {0}".format(
                            list(self.cert_hash_alg)))

        # hash issuer key
        hashed_key = secureHash(issuer_key, alg)
        if hashed_key != self.cert_issuer_key_hash:
            raise ValueError("Could not verify certificate public key")

        # hash issuer name
        hashed_name = secureHash(issuer_name, alg)
        if hashed_name != self.cert_issuer_name_hash:
            raise ValueError("Could not verify certificate DN")

        # serial number
        if server_cert.serial_number != self.cert_serial_num:
            raise ValueError("Could not verify certificate serial number")
        return True


class OCSPResponse(SignedObject):
    """ This class represents an OCSP response. """
    def __init__(self, value):
        super(OCSPResponse, self).__init__()
        self.bytes = None
        self.resp_status = None
        self.resp_type = None
        self.version = None
        self.resp_id = None
        self.produced_at = None
        self.responses = []
        self.certs = []
        self.parse(value)

    def parse(self, value):
        """
        Parse a DER-encoded OCSP response.

        :type value: stream of bytes
        :param value: An DER-encoded OCSP response
        """
        self.bytes = bytearray(value)
        parser = ASN1Parser(self.bytes)
        resp_status = parser.getChild(0)
        self.resp_status = resp_status.value[0]
        # if the response status is not successsful, abort parsing other fields
        if self.resp_status != OCSPRespStatus.successful:
            return self
        resp_bytes = parser.getChild(1).getChild(0)
        self.resp_type = resp_bytes.getChild(0).value
        response = resp_bytes.getChild(1)
        # check if response is id-pkix-ocsp-basic
        if list(self.resp_type) != [43, 6, 1, 5, 5, 7, 48, 1, 1]:
            raise SyntaxError()
        basic_resp = response.getChild(0)
        # parsing tbsResponseData fields
        self._tbsdataparse(basic_resp.getChild(0))
        self.tbs_data = basic_resp.getChildBytes(0)
        self.signature_alg = basic_resp.getChild(1).getChild(0).value
        self.signature = basic_resp.getChild(2).value
        # test if certs field is present
        if basic_resp.getChildCount() > 3:
            certs = basic_resp.getChild(3)
            cnt = certs.getChildCount()
            for i in range(cnt):
                certificate = X509()
                certificate.parseBinary(certs.getChild(i).value)
                self.certs.append(certificate)
        return self

    def _tbsdataparse(self, value):
        """
        Parse to be signed data,

        :type value: stream of bytes
        :param value: TBS data
        """
        # test if version is ommited
        field = value.getChild(0)
        cnt = 0
        if field.type.tag_id == 0:
            # version is not omitted
            cnt += 1
            self.version = field.value
        else:
            self.version = 1
        self.resp_id = value.getChild(cnt).value
        self.produced_at = value.getChild(cnt+1).value
        responses = value.getChild(cnt+2)
        resp_cnt = responses.getChildCount()
        for i in range(resp_cnt):
            resp = responses.getChild(i)
            parsed_resp = SingleResponse(resp)
            self.responses.append(parsed_resp)