Varbin/pep272-encryption

View on GitHub
src/pep272_encryption/__init__.py

Summary

Maintainability
B
5 hrs
Test Coverage
A
96%
#!python3
"""\
This module creates PEP-272 cipher object classes for building block ciphers
in python.

To use, inherit the PEP272Cipher and overwrite
``encrypt_block`` or ``decrypt_block(self, key, string, **kwargs)`` methods
and set the block_size attribute.


Example:

::

 class YourCipher(PEP272Cipher):
     block_size=8

     def encrypt_block(self, key, string, **kwargs):
         ...

     def decrypt_block(self, key_string, **kwargs):
         ...


"""

from abc import abstractmethod

try:
    from abc import ABC
except ImportError:
    from abc import ABCMeta
    ABC = ABCMeta('ABC', (object,), {})

try:
    from collections.abc import Mapping
except ImportError:
    from collections import Mapping

from .util import xor_strings, b_chr, b_ord, split_blocks, Counter
from .version import *  # noqa


MODE_ECB = 1  #:
MODE_CBC = 2  #:
MODE_CFB = 3  #:
MODE_PGP = 4  #:
MODE_OFB = 5  #:
MODE_CTR = 6  #:


class PEP272Cipher(ABC):
    """
    A cipher class as defined in PEP-272_.

    :param bytes key: The symmetric key to use for encryption.

    :param int mode: The mode of operation to use.
            For valid values see Reference/:ref:`api-modes`.

    :param bytes IV: A unique bytestring with once the block size in
            length. For security reasons it should be unpredictable and must
            never be used twice for the same key.
            Required for *CBC*, *CFB* and *OFB* mode of operation.


    :param \\**kwargs: See below.

    Depending on the blockcipher mode of operation one or multiple of the
    following arguments must be passed depending on the mode of operation.

    :Keyword arguments:

        *
            **iv** (`bytes`) -- Alternative name for **IV**

        *
            **segment_size** (`int`): The segment size for one encryption
            "segment" of CFB mode in bits. It must be multiple of 8
            (only byte-sized operations are allowed) and the maximum size is
            the block size * 8. Required for *CFB* mode of operation.

        *
            **counter** (`callable`):
            A callable object returning *block size* bytes or a counter
            from :py:mod:`Crypto.Util.Counter`. For security reasons the
            counter output must **never** repeat. Required for *CTR* mode.

        *
            Additional keyword arguments are passed to the underlying block
            cipher implementation as kwargs.


    .. versionchanged:: 0.4
        *IV* can be used as a positional argument.

    .. versionchanged:: 0.4
       PyCryptodome counters are accepted for *counter* in addition to
       to callables.


    .. _PEP-272: https://www.python.org/dev/peps/pep-0272/

    """

    block_size = NotImplemented

    @property
    def IV(self):
        if self.mode in (MODE_ECB, MODE_CTR):
            return None
        else:
            return self._status

    @IV.setter
    def IV(self, value):
        raise AttributeError("This property is read-only, and cannot be "
                             "assigned a new value")

    def __init__(self, key, mode, IV=None, **kwargs):
        "A cipher class as defined in PEP-272"
        if not key:
            raise ValueError("'key' cannot have a length of 0")

        self.key = key
        self.mode = mode
        self._status = IV or kwargs.pop('iv', None)

        self.segment_size = kwargs.pop('segment_size', -1)
        self._counter = kwargs.pop('counter', None)

        self.kwargs = kwargs

        self._check_arguments()
        self._keystream = self._create_keystream()

    def _check_iv(self):
        if self._status is None:
            raise TypeError("For CBC, CFB, PGP and OFB mode an IV is "
                            "required.")
        if len(self._status) != self.block_size and self.mode != MODE_PGP:
            raise ValueError("'IV' length must be block_size ({})".format(
                self.block_size))
        elif self.mode == MODE_PGP:
            if not len(self._status) in (self.block_size,
                                         self.block_size + 2):
                raise ValueError(
                    ("'IV' length must be block_size ({})"
                     "or blocksize + 2".format(self.block_size)))

    def _check_segment_size(self):
        if not self.segment_size:
            raise TypeError("missing required positional argument for CFB:"
                            " 'segment_size'")

        if self.segment_size < 0:
            self.segment_size = 8

        if not (8 <= self.segment_size <= self.block_size * 8) or (
                self.segment_size % 8):
            raise TypeError("segment_size must be between 8 and "
                            "block_size*8 and a multiple of 8")

    def _check_counter(self):
        if self._counter is None:
            raise TypeError(
                "missing required positional argument for CTR:"
                " 'counter'")

        if callable(self._counter):
            return

        if isinstance(self._counter, Mapping):
            counter = self._counter

            self._counter = Counter(
                nonce=counter['prefix'],
                initial_value=counter['initial_value'],
                suffix=counter['suffix'],
                block_size=counter['counter_len'],
                endian=["big", "little"][counter["little_endian"]]
            )

            return

        raise TypeError("counter must be a callable, it is not")

    def _check_arguments(self):
        """
        Checks if all required keyword arguments have been set.

        Tests for:
            - IV when using MODE_CBC, MODE_CFB, MODE_OFB
            - callable counter with MODE_CTR
        """
        if self.mode in (MODE_CBC, MODE_CFB, MODE_OFB, MODE_PGP):
            self._check_iv()

        if self.mode == MODE_CFB:
            self._check_segment_size()

        if self.mode == MODE_CTR:
            self._check_counter()

    def encrypt(self, string):
        """Encrypt data with the key and the parameters set at initialization.

        The cipher object is stateful; encryption of a long block
        of data can be broken up in two or more calls to `encrypt()`.
        That is, the statement:

            >>> c.encrypt(a) + c.encrypt(b)

        is always equivalent to:

             >>> c.encrypt(a+b)

        That also means that you cannot reuse an object for encrypting
        or decrypting other data with the same key.

        This function does not perform any padding.

         - For `MODE_ECB`, `MODE_CBC` *string* length
           (in bytes) must be a multiple of *block_size*.

         - For `MODE_CFB`, *string* length (in bytes) must be a multiple
           of *segment_size*/8.

         - For `MODE_CTR` and `MODE_OFB`, *string* can be of any length.

        :param bytes string: The piece of data to encrypt.
        :raises ValueError:
            When a mode of operation has be requested this code cannot handle.
        :raises ValueError:
            When len(string) has a wrong length, as described above.
        :raises TypeError:
            When the counter callable in CTR returns data with the wrong
            length.

        :return:
            The encrypted data, as a byte string. It is as long as
            *string*.
        :rtype: bytes
        """
        if self.mode in (MODE_OFB, MODE_CTR):
            return self._encrypt_with_keystream(string)

        if self.mode == MODE_CFB:
            return self._encrypt_cfb(string)

        if self.mode not in (MODE_ECB, MODE_CBC):
            raise ValueError("Unknown mode of operation")

        out = []

        for block in split_blocks(string, self.block_size):
            if self.mode == MODE_ECB:
                ecd = self.encrypt_block(self.key, block, **self.kwargs)
            else:  # self.mode == MODE_CBC
                xored = xor_strings(self._status, block)
                ecd = self._status = self.encrypt_block(self.key, xored,
                                                        **self.kwargs)

            out.append(ecd)

        return b"".join(out)

    def decrypt(self, string):
        """Decrypt data with the key and the parameters set at initialization.

        The cipher object is stateful; decryption of a long block
        of data can be broken up in two or more calls to `decrypt()`.
        That is, the statement:

            >>> c.decrypt(a) + c.decrypt(b)

        is always equivalent to:

             >>> c.decrypt(a+b)

        That also means that you cannot reuse an object for encrypting
        or decrypting other data with the same key.

        This function does not perform any padding.

         - For `MODE_ECB`, `MODE_CBC` *string* length
           (in bytes) must be a multiple of *block_size*.

         - For `MODE_CFB`, *string* length (in bytes) must be a multiple
           of *segment_size*/8.

         - For `MODE_CTR` and `MODE_OFB`, *string* can be of any length.

        :param bytes string: The piece of data to decrypt.
        :raises ValueError:
            When a mode of operation has be requested this code cannot handle.
        :raises ValueError:
            When len(string) has a wrong length, as described above.
        :raises TypeError:
            When the counter in CTR returns data of the wrong length.

        :return:
            The decrypted data, as a byte string. It is as long as
            *string*.
        :rtype: bytes
        """
        if self.mode in (MODE_OFB, MODE_CTR):
            return self.encrypt(string)

        if self.mode == MODE_CFB:
            return self._encrypt_cfb(string, True)

        if self.mode not in (MODE_ECB, MODE_CBC):
            raise ValueError("Unknown mode of operation")

        out = []

        for block in split_blocks(string, self.block_size):
            if self.mode == MODE_ECB:
                dec = self.decrypt_block(self.key, block, **self.kwargs)
            else:  # self.mode == MODE_CBC
                decrypted_but_not_xored = self.decrypt_block(self.key,
                                                             block,
                                                             **self.kwargs)
                dec = xor_strings(self._status, decrypted_but_not_xored)
                self._status = block

            out.append(dec)

        return b"".join(out)

    @abstractmethod
    def encrypt_block(self, key, block, **kwargs):
        """Dummy function for the encryption of a single block.
        Overwrite with 'real' encryption function.

        :param bytes key: The symmetric encryption key.
        :param bytes block: A single plaintext block to encrypt.
        :param \\**kwargs: Additional parameters passed to `__init__`.

        :raises NotImplementedError: This method is to be overridden.

        :returns: ciphertext block
        :rtype: bytes"""
        raise NotImplementedError

    @abstractmethod
    def decrypt_block(self, key, block, **kwargs):
        """Dummy function for the decryption of a single block.
        Overwrite with 'real' deryption function.

        :param bytes key: The symmetric encryption key.
        :param bytes block: A single ciphertext block to encrypt.
        :param \\**kwargs: Additional parameters passed to `__init__`.

        :raises NotImplementedError: This method is to be overridden.

        :returns: plaintext block
        :rtype: bytes"""
        raise NotImplementedError

    def _encrypt_with_keystream(self, data):
        """Encrypts data with the set keystream."""
        xor = [x ^ y for (x, y) in zip(map(b_ord, data),
                                       self._keystream)]
        return bytes(bytearray(xor))  # Faster

    def _encrypt_cfb(self, data, decrypt=False):
        """Encrypts data in CFB mode."""
        out = []

        for block in split_blocks(data, self.segment_size // 8):
            encrypted_iv = self.encrypt_block(self.key, self._status)
            ecd = xor_strings(encrypted_iv, block)

            iv_p1 = self._status[self.segment_size // 8:]
            iv_p2 = block if decrypt else ecd

            self._status = iv_p1 + iv_p2

            out.append(ecd)

        return b"".join(out)

    def _create_keystream(self):
        "Creates a keystream (generator object) for OFB or CTR mode."
        if self.mode not in (MODE_OFB, MODE_CTR):
            # Coverage somehow does not work here
            return  # pragma: no cover

        while True:
            if self.mode == MODE_OFB:
                _next = self._status
            elif self.mode == MODE_CTR:
                _next = self._counter()
                if len(_next) != self.block_size:
                    raise TypeError("Counter length must be block_size")

            self._status = self.encrypt_block(self.key, _next, **self.kwargs)

            for k in self._status:
                yield b_ord(k)