saltstack/salt

View on GitHub
salt/utils/rsax931.py

Summary

Maintainability
D
1 day
Test Coverage
# -*- coding: utf-8 -*-
'''
Create and verify ANSI X9.31 RSA signatures using OpenSSL libcrypto
'''

# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import glob
import sys
import os

# Import Salt libs
import salt.utils.platform
import salt.utils.stringutils

# Import 3rd-party libs
from ctypes import cdll, c_char_p, c_int, c_void_p, pointer, create_string_buffer
from ctypes.util import find_library

# Constants taken from openssl-1.1.0c/include/openssl/crypto.h
OPENSSL_INIT_ADD_ALL_CIPHERS = 0x00000004
OPENSSL_INIT_ADD_ALL_DIGESTS = 0x00000008
OPENSSL_INIT_NO_LOAD_CONFIG = 0x00000080


def _load_libcrypto():
    '''
    Load OpenSSL libcrypto
    '''
    if sys.platform.startswith('win'):
        # cdll.LoadLibrary on windows requires an 'str' argument
        return cdll.LoadLibrary(str('libeay32'))  # future lint: disable=blacklisted-function
    elif getattr(sys, 'frozen', False) and salt.utils.platform.is_smartos():
        return cdll.LoadLibrary(glob.glob(os.path.join(
            os.path.dirname(sys.executable),
            'libcrypto.so*'))[0])
    else:
        lib = find_library('crypto')
        if not lib and sys.platform.startswith('sunos5'):
            # ctypes.util.find_library defaults to 32 bit library path on sunos5, test for 64 bit python execution
            lib = find_library('crypto', sys.maxsize > 2**32)
        if not lib and salt.utils.platform.is_sunos():
            # Solaris-like distribution that use pkgsrc have
            # libraries in a non standard location.
            # (SmartOS, OmniOS, OpenIndiana, ...)
            # This could be /opt/tools/lib (Global Zone)
            # or /opt/local/lib (non-Global Zone), thus the
            # two checks below
            lib = glob.glob('/opt/local/lib/libcrypto.so*') + glob.glob('/opt/tools/lib/libcrypto.so*')
            lib = lib[0] if lib else None
        if not lib and salt.utils.platform.is_aix():
            if os.path.isdir('/opt/salt/lib'):
                # preference for Salt installed fileset
                lib = glob.glob('/opt/salt/lib/libcrypto.so*')
                lib = lib[0] if lib else None
            else:
                lib = glob.glob('/opt/freeware/lib/libcrypto.so*')
                lib = lib[0] if lib else None
        if lib:
            return cdll.LoadLibrary(lib)
        raise OSError('Cannot locate OpenSSL libcrypto')


def _init_libcrypto():
    '''
    Set up libcrypto argtypes and initialize the library
    '''
    libcrypto = _load_libcrypto()

    try:
        # If we're greater than OpenSSL 1.1.0, no need to to the init
        openssl_version_num = libcrypto.OpenSSL_version_num
        if callable(openssl_version_num):
            openssl_version_num = openssl_version_num()
        if openssl_version_num < 0x10100000:
            libcrypto.OPENSSL_init_crypto()
    except AttributeError:
        # Support for OpenSSL < 1.1 (OPENSSL_API_COMPAT < 0x10100000L)
        libcrypto.OPENSSL_no_config()
        libcrypto.OPENSSL_add_all_algorithms_noconf()

    libcrypto.RSA_new.argtypes = ()
    libcrypto.RSA_new.restype = c_void_p
    libcrypto.RSA_free.argtypes = (c_void_p, )
    libcrypto.RSA_size.argtype = (c_void_p)
    libcrypto.BIO_new_mem_buf.argtypes = (c_char_p, c_int)
    libcrypto.BIO_new_mem_buf.restype = c_void_p
    libcrypto.BIO_free.argtypes = (c_void_p, )
    libcrypto.PEM_read_bio_RSAPrivateKey.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p)
    libcrypto.PEM_read_bio_RSAPrivateKey.restype = c_void_p
    libcrypto.PEM_read_bio_RSA_PUBKEY.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p)
    libcrypto.PEM_read_bio_RSA_PUBKEY.restype = c_void_p
    libcrypto.RSA_private_encrypt.argtypes = (c_int, c_char_p, c_char_p, c_void_p, c_int)
    libcrypto.RSA_public_decrypt.argtypes = (c_int, c_char_p, c_char_p, c_void_p, c_int)

    return libcrypto


libcrypto = _init_libcrypto()

# openssl/rsa.h:#define RSA_X931_PADDING 5
RSA_X931_PADDING = 5


class RSAX931Signer(object):
    '''
    Create ANSI X9.31 RSA signatures using OpenSSL libcrypto
    '''
    def __init__(self, keydata):
        '''
        Init an RSAX931Signer instance

        :param str keydata: The RSA private key in PEM format
        '''
        keydata = salt.utils.stringutils.to_bytes(keydata, 'ascii')
        self._bio = libcrypto.BIO_new_mem_buf(keydata, len(keydata))
        self._rsa = c_void_p(libcrypto.RSA_new())
        if not libcrypto.PEM_read_bio_RSAPrivateKey(self._bio, pointer(self._rsa), None, None):
            raise ValueError('invalid RSA private key')

    def __del__(self):
        libcrypto.BIO_free(self._bio)
        libcrypto.RSA_free(self._rsa)

    def sign(self, msg):
        '''
        Sign a message (digest) using the private key

        :param str msg: The message (digest) to sign
        :rtype: str
        :return: The signature, or an empty string if the encryption failed
        '''
        # Allocate a buffer large enough for the signature. Freed by ctypes.
        buf = create_string_buffer(libcrypto.RSA_size(self._rsa))
        msg = salt.utils.stringutils.to_bytes(msg)
        size = libcrypto.RSA_private_encrypt(len(msg), msg, buf, self._rsa, RSA_X931_PADDING)
        if size < 0:
            raise ValueError('Unable to encrypt message')
        return buf[0:size]


class RSAX931Verifier(object):
    '''
    Verify ANSI X9.31 RSA signatures using OpenSSL libcrypto
    '''
    def __init__(self, pubdata):
        '''
        Init an RSAX931Verifier instance

        :param str pubdata: The RSA public key in PEM format
        '''
        pubdata = salt.utils.stringutils.to_bytes(pubdata, 'ascii')
        pubdata = pubdata.replace(b'RSA ', b'')
        self._bio = libcrypto.BIO_new_mem_buf(pubdata, len(pubdata))
        self._rsa = c_void_p(libcrypto.RSA_new())
        if not libcrypto.PEM_read_bio_RSA_PUBKEY(self._bio, pointer(self._rsa), None, None):
            raise ValueError('invalid RSA public key')

    def __del__(self):
        libcrypto.BIO_free(self._bio)
        libcrypto.RSA_free(self._rsa)

    def verify(self, signed):
        '''
        Recover the message (digest) from the signature using the public key

        :param str signed: The signature created with the private key
        :rtype: str
        :return: The message (digest) recovered from the signature, or an empty
            string if the decryption failed
        '''
        # Allocate a buffer large enough for the signature. Freed by ctypes.
        buf = create_string_buffer(libcrypto.RSA_size(self._rsa))
        signed = salt.utils.stringutils.to_bytes(signed)
        size = libcrypto.RSA_public_decrypt(len(signed), signed, buf, self._rsa, RSA_X931_PADDING)
        if size < 0:
            raise ValueError('Unable to decrypt message')
        return buf[0:size]