
View on GitHub


5 hrs
Test Coverage
This module creates PEP-272 cipher object classes for building block ciphers
in python.

To use, inherit the PEP272Cipher and overwrite
``encrypt_block`` or ``decrypt_block(self, key, string, **kwargs)`` methods
and set the block_size attribute.



 class YourCipher(PEP272Cipher):

     def encrypt_block(self, key, string, **kwargs):

     def decrypt_block(self, key_string, **kwargs):


from abc import abstractmethod

    from abc import ABC
except ImportError:
    from abc import ABCMeta
    ABC = ABCMeta('ABC', (object,), {})

    from import Mapping
except ImportError:
    from collections import Mapping

from .util import xor_strings, b_chr, b_ord, split_blocks, Counter
from .version import *  # noqa

MODE_ECB = 1  #:
MODE_CBC = 2  #:
MODE_CFB = 3  #:
MODE_PGP = 4  #:
MODE_OFB = 5  #:
MODE_CTR = 6  #:

class PEP272Cipher(ABC):
    A cipher class as defined in PEP-272_.

    :param bytes key: The symmetric key to use for encryption.

    :param int mode: The mode of operation to use.
            For valid values see Reference/:ref:`api-modes`.

    :param bytes IV: A unique bytestring with once the block size in
            length. For security reasons it should be unpredictable and must
            never be used twice for the same key.
            Required for *CBC*, *CFB* and *OFB* mode of operation.

    :param \\**kwargs: See below.

    Depending on the blockcipher mode of operation one or multiple of the
    following arguments must be passed depending on the mode of operation.

    :Keyword arguments:

            **iv** (`bytes`) -- Alternative name for **IV**

            **segment_size** (`int`): The segment size for one encryption
            "segment" of CFB mode in bits. It must be multiple of 8
            (only byte-sized operations are allowed) and the maximum size is
            the block size * 8. Required for *CFB* mode of operation.

            **counter** (`callable`):
            A callable object returning *block size* bytes or a counter
            from :py:mod:`Crypto.Util.Counter`. For security reasons the
            counter output must **never** repeat. Required for *CTR* mode.

            Additional keyword arguments are passed to the underlying block
            cipher implementation as kwargs.

    .. versionchanged:: 0.4
        *IV* can be used as a positional argument.

    .. versionchanged:: 0.4
       PyCryptodome counters are accepted for *counter* in addition to
       to callables.

    .. _PEP-272:


    block_size = NotImplemented

    def IV(self):
        if self.mode in (MODE_ECB, MODE_CTR):
            return None
            return self._status

    def IV(self, value):
        raise AttributeError("This property is read-only, and cannot be "
                             "assigned a new value")

    def __init__(self, key, mode, IV=None, **kwargs):
        "A cipher class as defined in PEP-272"
        if not key:
            raise ValueError("'key' cannot have a length of 0")

        self.key = key
        self.mode = mode
        self._status = IV or kwargs.pop('iv', None)

        self.segment_size = kwargs.pop('segment_size', -1)
        self._counter = kwargs.pop('counter', None)

        self.kwargs = kwargs

        self._keystream = self._create_keystream()

    def _check_iv(self):
        if self._status is None:
            raise TypeError("For CBC, CFB, PGP and OFB mode an IV is "
        if len(self._status) != self.block_size and self.mode != MODE_PGP:
            raise ValueError("'IV' length must be block_size ({})".format(
        elif self.mode == MODE_PGP:
            if not len(self._status) in (self.block_size,
                                         self.block_size + 2):
                raise ValueError(
                    ("'IV' length must be block_size ({})"
                     "or blocksize + 2".format(self.block_size)))

    def _check_segment_size(self):
        if not self.segment_size:
            raise TypeError("missing required positional argument for CFB:"
                            " 'segment_size'")

        if self.segment_size < 0:
            self.segment_size = 8

        if not (8 <= self.segment_size <= self.block_size * 8) or (
                self.segment_size % 8):
            raise TypeError("segment_size must be between 8 and "
                            "block_size*8 and a multiple of 8")

    def _check_counter(self):
        if self._counter is None:
            raise TypeError(
                "missing required positional argument for CTR:"
                " 'counter'")

        if callable(self._counter):

        if isinstance(self._counter, Mapping):
            counter = self._counter

            self._counter = Counter(
                endian=["big", "little"][counter["little_endian"]]


        raise TypeError("counter must be a callable, it is not")

    def _check_arguments(self):
        Checks if all required keyword arguments have been set.

        Tests for:
            - IV when using MODE_CBC, MODE_CFB, MODE_OFB
            - callable counter with MODE_CTR
        if self.mode in (MODE_CBC, MODE_CFB, MODE_OFB, MODE_PGP):

        if self.mode == MODE_CFB:

        if self.mode == MODE_CTR:

    def encrypt(self, string):
        """Encrypt data with the key and the parameters set at initialization.

        The cipher object is stateful; encryption of a long block
        of data can be broken up in two or more calls to `encrypt()`.
        That is, the statement:

            >>> c.encrypt(a) + c.encrypt(b)

        is always equivalent to:

             >>> c.encrypt(a+b)

        That also means that you cannot reuse an object for encrypting
        or decrypting other data with the same key.

        This function does not perform any padding.

         - For `MODE_ECB`, `MODE_CBC` *string* length
           (in bytes) must be a multiple of *block_size*.

         - For `MODE_CFB`, *string* length (in bytes) must be a multiple
           of *segment_size*/8.

         - For `MODE_CTR` and `MODE_OFB`, *string* can be of any length.

        :param bytes string: The piece of data to encrypt.
        :raises ValueError:
            When a mode of operation has be requested this code cannot handle.
        :raises ValueError:
            When len(string) has a wrong length, as described above.
        :raises TypeError:
            When the counter callable in CTR returns data with the wrong

            The encrypted data, as a byte string. It is as long as
        :rtype: bytes
        if self.mode in (MODE_OFB, MODE_CTR):
            return self._encrypt_with_keystream(string)

        if self.mode == MODE_CFB:
            return self._encrypt_cfb(string)

        if self.mode not in (MODE_ECB, MODE_CBC):
            raise ValueError("Unknown mode of operation")

        out = []

        for block in split_blocks(string, self.block_size):
            if self.mode == MODE_ECB:
                ecd = self.encrypt_block(self.key, block, **self.kwargs)
            else:  # self.mode == MODE_CBC
                xored = xor_strings(self._status, block)
                ecd = self._status = self.encrypt_block(self.key, xored,


        return b"".join(out)

    def decrypt(self, string):
        """Decrypt data with the key and the parameters set at initialization.

        The cipher object is stateful; decryption of a long block
        of data can be broken up in two or more calls to `decrypt()`.
        That is, the statement:

            >>> c.decrypt(a) + c.decrypt(b)

        is always equivalent to:

             >>> c.decrypt(a+b)

        That also means that you cannot reuse an object for encrypting
        or decrypting other data with the same key.

        This function does not perform any padding.

         - For `MODE_ECB`, `MODE_CBC` *string* length
           (in bytes) must be a multiple of *block_size*.

         - For `MODE_CFB`, *string* length (in bytes) must be a multiple
           of *segment_size*/8.

         - For `MODE_CTR` and `MODE_OFB`, *string* can be of any length.

        :param bytes string: The piece of data to decrypt.
        :raises ValueError:
            When a mode of operation has be requested this code cannot handle.
        :raises ValueError:
            When len(string) has a wrong length, as described above.
        :raises TypeError:
            When the counter in CTR returns data of the wrong length.

            The decrypted data, as a byte string. It is as long as
        :rtype: bytes
        if self.mode in (MODE_OFB, MODE_CTR):
            return self.encrypt(string)

        if self.mode == MODE_CFB:
            return self._encrypt_cfb(string, True)

        if self.mode not in (MODE_ECB, MODE_CBC):
            raise ValueError("Unknown mode of operation")

        out = []

        for block in split_blocks(string, self.block_size):
            if self.mode == MODE_ECB:
                dec = self.decrypt_block(self.key, block, **self.kwargs)
            else:  # self.mode == MODE_CBC
                decrypted_but_not_xored = self.decrypt_block(self.key,
                dec = xor_strings(self._status, decrypted_but_not_xored)
                self._status = block


        return b"".join(out)

    def encrypt_block(self, key, block, **kwargs):
        """Dummy function for the encryption of a single block.
        Overwrite with 'real' encryption function.

        :param bytes key: The symmetric encryption key.
        :param bytes block: A single plaintext block to encrypt.
        :param \\**kwargs: Additional parameters passed to `__init__`.

        :raises NotImplementedError: This method is to be overridden.

        :returns: ciphertext block
        :rtype: bytes"""
        raise NotImplementedError

    def decrypt_block(self, key, block, **kwargs):
        """Dummy function for the decryption of a single block.
        Overwrite with 'real' deryption function.

        :param bytes key: The symmetric encryption key.
        :param bytes block: A single ciphertext block to encrypt.
        :param \\**kwargs: Additional parameters passed to `__init__`.

        :raises NotImplementedError: This method is to be overridden.

        :returns: plaintext block
        :rtype: bytes"""
        raise NotImplementedError

    def _encrypt_with_keystream(self, data):
        """Encrypts data with the set keystream."""
        xor = [x ^ y for (x, y) in zip(map(b_ord, data),
        return bytes(bytearray(xor))  # Faster

    def _encrypt_cfb(self, data, decrypt=False):
        """Encrypts data in CFB mode."""
        out = []

        for block in split_blocks(data, self.segment_size // 8):
            encrypted_iv = self.encrypt_block(self.key, self._status)
            ecd = xor_strings(encrypted_iv, block)

            iv_p1 = self._status[self.segment_size // 8:]
            iv_p2 = block if decrypt else ecd

            self._status = iv_p1 + iv_p2


        return b"".join(out)

    def _create_keystream(self):
        "Creates a keystream (generator object) for OFB or CTR mode."
        if self.mode not in (MODE_OFB, MODE_CTR):
            # Coverage somehow does not work here
            return  # pragma: no cover

        while True:
            if self.mode == MODE_OFB:
                _next = self._status
            elif self.mode == MODE_CTR:
                _next = self._counter()
                if len(_next) != self.block_size:
                    raise TypeError("Counter length must be block_size")

            self._status = self.encrypt_block(self.key, _next, **self.kwargs)

            for k in self._status:
                yield b_ord(k)