williamgibb/pyFuckery

View on GitHub
src/fuckery/vm.py

Summary

Maintainability
A
3 hrs
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
pyFuckery - vm.py
Created on 2/12/17.

VM Definition to execute brainfuck programs which have been parsed into lark.Tree objects.
"""
# Stdlib
import argparse
import hashlib
import logging
import sys

# Custom Code
from fuckery.constants import DEFAULT_MEMORY_SIZE
from fuckery.constants import SYM_ACTIONS
from fuckery.constants import SYM_DATA_DEC
from fuckery.constants import SYM_DATA_INC
from fuckery.constants import SYM_EXPRESSIONS
from fuckery.constants import SYM_IO_INPUT
from fuckery.constants import SYM_IO_OUTPUT
from fuckery.constants import SYM_LOOP
from fuckery.constants import SYM_PROGRAM
from fuckery.constants import SYM_PTR_DEC
from fuckery.constants import SYM_PTR_INC
from fuckery.exc import ExitCondition
from fuckery.exc import VMError
from fuckery.memory import Storage
from fuckery.parser import parse_program
# Third Party Code
from lark import Tree

log = logging.getLogger(__name__)


class VirtualMachine(object):
    """
    This is the brainfuck VM.  You can drop this into programs that need a brainfuck VM, such as a module you don't
    want a coworker to ever easily maintain, or a really cruel programming based game.
    """
    def __init__(self,
                 memory_size: int = DEFAULT_MEMORY_SIZE,
                 loop_detection: bool = False):
        """
        Init function for the VM.

        :param memory_size: Number of memory cells to instantiate.
        :param loop_detection: Enables loop detection if this evaluates to True.  This is very costly from a
        computation perspective, so use it wisely.
        """
        self.data_pointer = 0
        self._memory_size = memory_size
        self.memory = Storage(n=memory_size)
        self.loop_detection = loop_detection
        self.stream_in = sys.stdin
        self.stream_out = sys.stdout
        self.sym2func = {
            SYM_DATA_DEC: self.dec_data_value,
            SYM_DATA_INC: self.inc_data_value,
            SYM_IO_INPUT: self.io_input,
            SYM_IO_OUTPUT: self.io_output,
            SYM_PTR_DEC: self.dec_data_ptr,
            SYM_PTR_INC: self.inc_data_ptr,
            }

    @property
    def current_value(self) -> int:
        """
        Property which represents the data value the current memory address points too.

        :return:
        """
        return self.memory.get(self.data_pointer)

    @property
    def state_hash(self) -> str:
        """
        MD5 representing the state of the system.  It is a hash of the memory and the current data pointer.

        Note - Computing this frequently can be expensive to do with a large memory section,
        as the memory section is serialized via msgpack.dumps() prior to hashing.

        :return:
        """
        s = self.memory.mem_hash + str(self.data_pointer)
        ret = hashlib.md5(s.encode()).hexdigest()
        return ret

    def inc_data_ptr(self) -> None:
        """
        Increments the data pointer by 1.

        :return: None
        """
        temp = self.data_pointer + 1
        if temp not in self.memory:
            raise VMError(f'Invalid memory address: {temp}')
        self.data_pointer = temp

    def dec_data_ptr(self) -> None:
        """
        Decrements the data pointer by 1.

        :return: None
        """
        temp = self.data_pointer - 1
        if temp not in self.memory:
            raise VMError(f'Invalid memory address: {temp}')
        self.data_pointer = temp

    def inc_data_value(self) -> None:
        """
        Increments the value pointed to by the data pointer by 1.
        This wraps at 255, back to zero.

        :return: None
        """
        v = self.memory.get(self.data_pointer)
        v = (v + 1) % 256
        self.memory.set(addr=self.data_pointer, value=v)

    def dec_data_value(self) -> None:
        """
        Decrements the value pointed to by the data pointer by 1.
        This wraps at zero, back to 255.

        :return: None
        """
        v = self.memory.get(self.data_pointer)
        v = (v - 1) % 256
        self.memory.set(addr=self.data_pointer, value=v)

    def io_output(self) -> None:
        """
        Writes the current value, after casting it via chr(), to self.stream_out.

        :return: None
        """
        v = self.current_value
        s = chr(v)
        # Ducktyping stream_out
        self.stream_out.write(s)

    def io_input(self) -> None:
        """
        Reads a single character from self.stream_in.

        If self.stream_in is sys.stdin (default value), it will prompt the user for a string and record the FIRST
        byte of that string.  Otherwise, it will attempt to read 1 byte from the stream_in buffer.

        Empty inputs have no effect on the state of the system.

        :return: None
        """
        self.memory.set(addr=self.data_pointer, value=0)
        try:
            if self.stream_in is sys.stdin:  # pragma: no cover
                v = input('>')
                v = v[0]
            else:
                v = self.stream_in.read(1)
            v = ord(v)
        except (TypeError, IndexError):
            return
        except KeyboardInterrupt:
            raise ExitCondition('Keyboard error - exiting.')
        else:
            self.memory.set(addr=self.data_pointer, value=v)

    def run(self, tree: Tree) -> None:
        """
        Walk a Brainfuck AST and execute the program contained in the AST.

        This function is recursive, so its possible for a deeply nested program to hit the Python interpreter recursion
        limit, but if your brainfuck does that, kudos to you.

        :param tree: Parsed brainfuck program.
        :return:
        """
        # XXX This is a manual tree parsing.  Can we do better with a lark Transformer?
        if tree.data == SYM_PROGRAM:
            log.debug('Program entry point.')
            for t in tree.children:
                self.run(t)
            return
        if tree.data == SYM_ACTIONS:
            token = tree.children[0]
            func = self.sym2func.get(token)
            func()
            return
        if tree.data == SYM_EXPRESSIONS:
            for i, t in enumerate(tree.children):
                # log.debug(f'Executing command #{i} - {repr(t)}')
                self.run(tree=t)
            return
        if tree.data == SYM_LOOP:
            sh = ''
            if self.loop_detection:
                sh = self.state_hash
            while self.current_value != 0:
                for t in tree.children:
                    if isinstance(t, Tree):
                        self.run(tree=t)
                if self.loop_detection:
                    if self.state_hash == sh:
                        raise VMError('Infinite loop detected - no change in memory during loop execution!')
                    sh = self.state_hash
            return
        raise NotImplementedError(f'Unknown tree type seen: {tree.data}')  # pragma: no cover

    def parse_and_run(self, program: str) -> None:
        """
        Parse and run a brainfuck program.

        :param program: String representing a brainfuck program.
        :return: None
        """
        t = parse_program(s=program)
        self.run(tree=t)


# noinspection PyMissingOrEmptyDocstring
def main(options):  # pragma: no cover
    if not options.verbose:
        logging.disable(logging.DEBUG)
    log.info('Executing "Hello World!"')
    s = '++++++++[>++++[>++>+++>+++>+<<<<-]>+>+>->>+[<]<-]>>.>---.+++++++..+++.>>.<-.<.+++.------.--------.>>+.>++.'
    t = parse_program(s=s)
    vm = VirtualMachine()
    vm.run(tree=t)
    sys.exit(0)


# noinspection PyMissingOrEmptyDocstring
def makeargpaser():  # pragma: no cover
    parser = argparse.ArgumentParser(description="Run a simple hello world program.")
    parser.add_argument('-v', '--verbose', dest='verbose', default=False, action='store_true',
                        help='Enable verbose output')
    return parser


def _main():  # pragma: no cover
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(funcName)s]')
    p = makeargpaser()
    opts = p.parse_args()
    main(opts)


if __name__ == '__main__':  # pragma: no cover
    _main()