DivisionBy-Zero/erpa-sweng

View on GitHub
server/crypto.py

Summary

Maintainability
A
0 mins
Test Coverage
from functools import wraps
import base64
import inspect
import ed25519

from models import UserAuth


def non_null_params(f):
    @wraps(f)
    def fn(*args):
        for i in range(1, len(args)):
            if args[i] is None:
                argname = inspect.getfullargspec(f).args[i]
                raise ValueError('{} cannot be null'.format(argname))
        return f(*args)
    return f


class CryptoGrenouille:
    # OID 1.3.101.xxx: https://tools.ietf.org/html/draft-ietf-curdle-pkix-04
    OID_ED25519 = 112
    OID_BYTE = 8
    IDLEN_BYTE = 3

    def _get_verifying_key_from_user_auth(self, user_auth: UserAuth):
        x509b = base64.b64decode(user_auth.public_key)
        if x509b[self.OID_BYTE] != self.OID_ED25519:
            raise ValueError('Grenouille authentication requires an x509 Ed25519 Certificate')
        total_length = 46 if x509b[self.IDLEN_BYTE] == 7 else 44
        id_length = 7 if x509b[self.IDLEN_BYTE] == 7 else 5
        if len(x509b) != total_length:
            raise ValueError('Malformed x509 message: Expecting {} bytes, actual {}'.format(total_length, len(x509b)))
        # Magic numbers extracted from the java implementation of ed25519: str4d/ed25519-java
        # https://github.com/str4d/ed25519-java/blob/master/src/net/i2p/crypto/eddsa/EdDSAPublicKey.java
        expected_header = bytes([0x30, (total_length - 2), 0x30, id_length, 0x06, 3, (1 * 40) + 3, 101])
        if x509b[0:len(expected_header)] != expected_header:
            raise ValueError('Malformed x509 message: Malformed header')
        header_scan_idx = len(expected_header) + 1  # +1: OID_BYTE, which we already checked
        if id_length == 7:
            if x509b[header_scan_idx:header_scan_idx + 2] != bytes([0x05, 0]):
                raise ValueError('Unsupported x509 key spec')
            header_scan_idx = header_scan_idx + 2
        if x509b[header_scan_idx:header_scan_idx + 3] != bytes([0x03, 33, 0]):
            raise ValueError('Unsupported x509 key spec')
        header_scan_idx = header_scan_idx + 3
        return ed25519.VerifyingKey(x509b[header_scan_idx:])

    @non_null_params
    def validate_user_auth(self, user_auth: UserAuth):
        self._get_verifying_key_from_user_auth(user_auth)

    @non_null_params
    def verify_message_signature(self, b64_signature: str, message: str,
                                 user_auth: UserAuth) -> bool:
        verifying_key = self._get_verifying_key_from_user_auth(user_auth)
        signature = base64.b64decode(b64_signature)
        try:
            verifying_key.verify(signature, message.encode('UTF-8'))
        except ed25519.BadSignatureError:
            return False
        return True


strategy_engines = {
    'Grenouille': CryptoGrenouille()
}


def with_engine_from_last_argument(f):
    @wraps(f)
    def fn(*args):
        user_auth = args[-1]
        strategy = user_auth.authentication_strategy
        engine = strategy_engines[strategy]
        if not engine:
            msg = 'Cannot proceed with {}-type crypto'.format(strategy)
            raise ValueError(msg)
        return f(*(args + (engine,)))
    return fn


class Crypto:
    @non_null_params
    @with_engine_from_last_argument
    def validate_user_auth(self, user_auth: UserAuth, engine):
        return engine.validate_user_auth(user_auth)

    @non_null_params
    @with_engine_from_last_argument
    def verify_message_signature(self, b64_signature: str, message: str,
                                 user_auth: UserAuth, engine) -> bool:
        return engine.verify_message_signature(b64_signature, message, user_auth)