
View on GitHub


40 mins
Test Coverage
"""Class for handling primary OCSP responses"""

from .utils.asn1parser import ASN1Parser
from .utils.cryptomath import bytesToNumber, numBytes, secureHash
from .x509 import X509
from .signed import SignedObject
from .errors import TLSIllegalParameterException

class OCSPRespStatus(object):
    """ OCSP response status codes (RFC 2560) """
    successful = 0
    malformedRequest = 1
    internalError = 2
    tryLater = 3    # 4 is not used to match RFC2560 specification
    sigRequired = 5
    unauthorized = 6

class CertStatus(object):
    """ Certificate status in an OCSP response """
    good, revoked, unknown = range(3)

class SingleResponse(object):
    """ This class represents SingleResponse ASN1 type (defined in RFC2560) """
    def __init__(self, value):
        self.value = value
        self.cert_hash_alg = None
        self.cert_issuer_name_hash = None
        self.cert_issuer_key_hash = None
        self.cert_serial_num = None
        self.cert_status = None
        self.this_update = None
        self.next_update = None

    _hash_algs_OIDs = {
        tuple([0x2a, 0x86, 0x48, 0x86, 0xf7, 0xd, 0x2, 0x5]): 'md5',
        tuple([0x2b, 0xe, 0x3, 0x2, 0x1a]): 'sha1',
        tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x4]): 'sha224',
        tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x1]): 'sha256',
        tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x2]): 'sha384',
        tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x3]): 'sha512'

    def parse(self, value):
        cert_id = value.getChild(0)
        self.cert_hash_alg = cert_id.getChild(0).getChild(0).value
        self.cert_issuer_name_hash = cert_id.getChild(1).value
        self.cert_issuer_key_hash = cert_id.getChild(2).value
        self.cert_serial_num = bytesToNumber(cert_id.getChild(3).value)
        self.cert_status = value.getChild(1).value
        self.this_update = value.getChild(2).value
        # next_update is optional
            fld = value.getChild(3)
            if fld.type.tag_id == 0:
                self.next_update = fld.value
        except SyntaxError:
            self.next_update = None

    def verify_cert_match(self, server_cert, issuer_cert):
        # extact subject public key
        issuer_key = issuer_cert.subject_public_key

        # extract issuer DN
        issuer_name = issuer_cert.subject

            alg = self._hash_algs_OIDs[tuple(self.cert_hash_alg)]
        except KeyError as e:
            raise TLSIllegalParameterException("Unknown hash algorithm: {0}".format(

        # hash issuer key
        hashed_key = secureHash(issuer_key, alg)
        if hashed_key != self.cert_issuer_key_hash:
            raise ValueError("Could not verify certificate public key")

        # hash issuer name
        hashed_name = secureHash(issuer_name, alg)
        if hashed_name != self.cert_issuer_name_hash:
            raise ValueError("Could not verify certificate DN")

        # serial number
        if server_cert.serial_number != self.cert_serial_num:
            raise ValueError("Could not verify certificate serial number")
        return True

class OCSPResponse(SignedObject):
    """ This class represents an OCSP response. """
    def __init__(self, value):
        super(OCSPResponse, self).__init__()
        self.bytes = None
        self.resp_status = None
        self.resp_type = None
        self.version = None
        self.resp_id = None
        self.produced_at = None
        self.responses = []
        self.certs = []

    def parse(self, value):
        Parse a DER-encoded OCSP response.

        :type value: stream of bytes
        :param value: An DER-encoded OCSP response
        self.bytes = bytearray(value)
        parser = ASN1Parser(self.bytes)
        resp_status = parser.getChild(0)
        self.resp_status = resp_status.value[0]
        # if the response status is not successsful, abort parsing other fields
        if self.resp_status != OCSPRespStatus.successful:
            return self
        resp_bytes = parser.getChild(1).getChild(0)
        self.resp_type = resp_bytes.getChild(0).value
        response = resp_bytes.getChild(1)
        # check if response is id-pkix-ocsp-basic
        if list(self.resp_type) != [43, 6, 1, 5, 5, 7, 48, 1, 1]:
            raise SyntaxError()
        basic_resp = response.getChild(0)
        # parsing tbsResponseData fields
        self.tbs_data = basic_resp.getChildBytes(0)
        self.signature_alg = basic_resp.getChild(1).getChild(0).value
        self.signature = basic_resp.getChild(2).value
        # test if certs field is present
        if basic_resp.getChildCount() > 3:
            certs = basic_resp.getChild(3)
            cnt = certs.getChildCount()
            for i in range(cnt):
                certificate = X509()
        return self

    def _tbsdataparse(self, value):
        Parse to be signed data,

        :type value: stream of bytes
        :param value: TBS data
        # test if version is ommited
        field = value.getChild(0)
        cnt = 0
        if field.type.tag_id == 0:
            # version is not omitted
            cnt += 1
            self.version = field.value
            self.version = 1
        self.resp_id = value.getChild(cnt).value
        self.produced_at = value.getChild(cnt+1).value
        responses = value.getChild(cnt+2)
        resp_cnt = responses.getChildCount()
        for i in range(resp_cnt):
            resp = responses.getChild(i)
            parsed_resp = SingleResponse(resp)