Vipyr/vimcryption

View on GitHub
encryptionengine/engine.py

Summary

Maintainability
A
2 hrs
Test Coverage
"""
Definitions of the base classes needed to match the EncryptionEngine API.
"""
# Standard Library
from base64 import b64encode
import functools


class EncryptionEngine(object):
    """ Base vimcryption encryption engine.
    """
    @staticmethod
    def buffer_iter(data):
        # type: List[str]
        """ Generative iterator that yields each character
            from a list of strings, joined by newlines.
        """
        for index, line in enumerate(data):
            for character in line:
                yield character
            if index < len(data):
                yield "\n"

    @staticmethod
    def byte_iter(file_handle):
        # type: io.ByteIO
        """ Generative iterator that yields each byte from a BytesIO object.
        """
        character = file_handle.read(1)
        while character != b"":
            yield character
            character = file_handle.read(1)

    def __init__(self, prompt=input):
        """ Sets the input method, cipher_type by classname and prompts for a key, if any.
        """
        self.input = prompt
        self.cipher_type = self.__class__.__name__[:6].upper()
        self.cipher_key = self.get_cipher_key()

    def get_cipher_key(self):
        """ Default cipher key is empty-string.
        """
        return ""

    def encrypt(self, data, file_handle):
        # type: (Union[List[str], str], io.BytesIO):
        """ `encrypt` must be implemented by derived classes.
        """
        raise NotImplementedError(self.__class__.__name__ + ".encrypt must be implemented by a derived class!")

    def decrypt(self, file_handle, data):
        # type: (io.BytesIO, Union[List[str], str]):
        """ `decrypt` must be implemented by derived classes.
        """
        raise NotImplementedError(self.__class__.__name__ + ".decrypt must be implemented by a derived class!")

    def encrypt_file(self, data, file_handle):
        # type: (Union[List[str], str], io.BytesIO)
        """ Convenience function for writing the header and encrypting the buffer at once.
        """
        self.write_header(file_handle)
        self.encrypt(data, file_handle)

    def decrypt_file(self, file_handle, data):
        # type: (io.BytesIO, Union[List[str], str])
        """ Convenience function for reading the header and decrypting the file at once.
        """
        self.read_header(file_handle)
        self.decrypt(file_handle, data)

    def read_header(self, file_handle):
        """ Implement for additional meta-data needed for the cipher implementation
        """
        pass

    def write_header(self, file_handle):
        """ Requires anyone implementing encryption Engine to
            call super.writeHeader()
        """
        file_handle.write(b64encode('vimcrypted'))
        file_handle.write(b64encode(self.cipher_type))


class PassThrough(EncryptionEngine):
    """ Simple pass-through engine.
    """
    def encrypt(self, data, file_handle):
        # type: (Union[List[str], str], io.BytesIO):
        if isinstance(data, str):
            file_handle.write((data + "\n").encode("utf8"))
        else:
            for item in data:
                file_handle.write((item + "\n").encode("utf8"))

    def decrypt(self, file_handle, data):
        # type: (io.BytesIO, Union[List[str], str]):
        line = ""
        for bchar in self.byte_iter(file_handle):
            char = ""
            try:
                char = bchar.decode("utf8")
            except UnicodeDecodeError:
                char = "?"

            if char == "\n":
                data.append(line)
                line = ""
            else:
                line += char

        if line != "":
            data.append(line)

    def write_header(self, file_handle):
        pass


class BlockCipherEngine(EncryptionEngine):
    """ Base class for block ciphers.  Provides generic API for common utilities.
        Derived classes MUST define `self.encrypt_blocksize` and `self.decrypt_blocksize`
        in order to use the default encrypt/decrypt functions.  Derived classes
        may implement `encrypt` and `decrypt` instead.
        Supplied methods:
            Buffer iterator generator function
            BytesIO iterator generator function
            Block iterator generator function
            `encrypt` based on Block iterator
            `decrypt` based on block iterator
    """
    encrypt_blocksize = None
    decrypt_blocksize = None
    pad_character = None

    @staticmethod
    def block_iter(generator, block_size, pad):
        # type: (Iterable, int, Union[str, bytes])
        """ Generative iterator that yields a [byte] string of length `block_size`
            by concatenating characters yielded by `generator.  The final block
            is padded using `pad`.
        """
        block = []
        for item in generator:
            block.append(item)
            if len(block) == block_size:
                yield functools.reduce(lambda a, b: a + b, block)
                block = []
        if block:
            while len(block) < block_size:
                block.append(pad)
            yield functools.reduce(lambda a, b: a + b, block)

    def encrypt(self, data, file_handle):
        # type: (Union[Iterable[str], str], io.BytesIO)
        """ Block ciphers always encrypt blocks of a predefined size.  By default `encrypt` will consume
            `self.encrypt_blocksize` characters and call `self.encrypt_block` on that to determine what
            to write to `file_handle`.
            Derived classes are required to define `encrypt_blocksize` and define `encrypt_block`.
        """
        if isinstance(data, str):
            iterable = data
        else:
            iterable = self.buffer_iter(data)
        for block in self.block_iter(iterable, self.encrypt_blocksize, self.pad_character):
            file_handle.write(self.encrypt_block(block))

    def decrypt(self, file_handle, data):
        # type: (io.BytesIO, Union[Iterable[str], str])
        """ Block ciphers always decrypt blocks of a predefined size.  By default `decrypt` will consume
            `self.decrypt_blocksize` bytes and call `self.decrypt_block` on that to determine what
            to append to `data`.
            Derived classes are required to define `decrypt_blocksize` and define `decrypt_block`.
        """
        line = ""
        for block in self.block_iter(self.byte_iter(file_handle), self.decrypt_blocksize, self.pad_character.encode()):
            plaintext = self.decrypt_block(block)
            for character in plaintext:
                if character == "\n":
                    data.append(line)
                    line = ""
                else:
                    line += character
        if line != "":
            data.append(line)

    def encrypt_block(self, block):
        # type: (str)
        """ `encrypt_block` must be implemented by derived classes.
        """
        raise NotImplementedError(self.__class__.__name__ + ".encrypt_block must be implemented by a derived class!")

    def decrypt_block(self, block):
        # type: (bytes)
        """ `decrypt_block` must be implemented by derived classes.
        """
        raise NotImplementedError(self.__class__.__name__ + ".decrypt_block must be implemented by a derived class!")