breathe/NotebookScripter

View on GitHub
NotebookScripter/_main.py

Summary

Maintainability
C
1 day
Test Coverage

import types
import io
import os
import sys
import traceback
import typing
import pickle
import codecs

from IPython import get_ipython
from IPython.core.interactiveshell import InteractiveShell
from IPython.core.magic import Magics, magics_class, line_magic

from traitlets.config import MultipleInstanceError

from nbformat import read as read_notebook


# Holds values to be injected into module execution context via receive_parameter/__receive_options
__notebookscripter_injected__ = [[{}, {}]]


def __add_parameter_frame(injected_parameters):
    global __notebookscripter_injected__
    __notebookscripter_injected__ += [[injected_parameters, {}]]


def __pop_parameter_frame():
    __notebookscripter_injected__.pop()


def set_notebook_option(
    **kwords
):
    """
    Customize run_notebook behavior.  Parameters:

    with_matplotlib_backend: Override behavior of ipython's matplotlib 'magic directive' -- by default reinterprets "%matplotlib inline" as "%matplotlib agg" -- set to None to disable

    """
    valid_parameters = ["with_matplotlib_backend"]
    for key, value in kwords.items():
        if key not in valid_parameters:
            raise ValueError(f"Unknown notebook configuration parameter: {key} -- valid parameters {','.join(valid_parameters)}")
        elif key == "with_matplotlib_backend":
            __notebookscripter_injected__[-1][1][key] = value


def __receive_option(**kwords):
    """Receive configuration options.

    Exactly 1 keyword argument is required -- the key gives the option a name and
    the value provides the the 'default' value for the parameter.

    The default value is returned if the parameter was not provided in a call to set_notebook_option
    """

    if len(kwords) != 1:
        raise ValueError("Exactly 1 kword argument must be passed to receive_option")

    namespaces_to_search = [i for _, i in __notebookscripter_injected__]
    ret = []

    # search the namespaces in reverse order
    param_name, default_value = next(iter(kwords.items()))
    for module_namespace in reversed(namespaces_to_search):
        if param_name in module_namespace:
            ret.append(module_namespace[param_name])

    # search space did not contain item -- use default value
    if not ret:
        ret.append(default_value)

    # return the found item
    return ret[0]


def receive_parameter(**kwords):
    """Receive parameters from the outside world.

    Exactly 1 keyword argument is required -- the key gives the parameter a name and
    the value provides the the 'default' value for the parameter.

    The default value is returned if the parameter was not provided in the
    call to run_notebook.
    """

    # can't do this because kword argument order is not preserved in some python versions ...
    if len(kwords) != 1:
        raise ValueError("Exactly 1 kword argument must be passed to receive_parameter")

    namespaces_to_search = [i for i, _ in __notebookscripter_injected__]
    ret = []

    # search the namespaces in reverse order
    param_name, default_value = next(iter(kwords.items()))
    for module_namespace in reversed(namespaces_to_search):
        if param_name in module_namespace:
            ret.append(module_namespace[param_name])

    # search space did not contain item -- use default value
    if not ret:
        ret.append(default_value)

    # return the found item
    return ret[0]


class NotebookScripterEmbeddedIpythonShell(InteractiveShell):

    def enable_gui(self, gui=None):
        pass

    def init_sys_modules(self):
        """Override this to create an ipython shell appropriate for embedding similar to InteractiveShellEmbed.

        Needed to avoid creating new global namespace when running from command line console.
        """
        pass

    def init_prompts(self):
        """Override: don't mutate shell prompts.  Needed to avoid overtaking the interactive shell when this code is run from `python` command line console."""
        # Set system prompts, so that scripts can decide if they are running
        # interactively.
        # sys.ps1 = 'In : '
        # sys.ps2 = '...: '
        # sys.ps3 = 'Out: '


def register_magic(shell_instance, magic_cls):
    """
    Registers the provided shell_instance from IPython.

    Returns a function which undoes this.

    Rant: Why the f... does IPython not define it's own unregister function?

    :param magic_cls: The Magics class you wish to register.
    """

    # ugh I hate this code and I hate python so much ...
    undoes = {}
    original_magics = shell_instance.magics_manager.magics
    for magic_type, names in magic_cls.magics.items():
        if magic_type in original_magics:
            for magic_name, _ in names.items():
                if magic_name in original_magics[magic_type]:
                    undoesNamedMagics = undoes.setdefault(magic_type, {})
                    undoesNamedMagics[magic_name] = original_magics[magic_type][magic_name]

    shell_instance.register_magics(magic_cls)

    def unregister_magics():
        for magic_type, magic_names in undoes.items():
            for magic_name, magic_value in magic_names.items():
                shell_instance.magics_manager.magics[magic_type][magic_name] = magic_value

    return unregister_magics


def run_notebook(
        path_to_notebook: str,
        **hooks
) -> typing.Any:
    """Run a notebook within calling process

    Args:
        path_to_notebook: Path to .ipynb or .py file containing notebook code
    Returns:
        Returns newly created (anonymous) python module in which the target code was executed.
    """
    try:
        shell = NotebookScripterEmbeddedIpythonShell.instance()
    except MultipleInstanceError:
        # we are already embedded into an ipython shell -- just get that one.
        shell = get_ipython()

    unregister_magics = None

    with_backend = __receive_option(with_matplotlib_backend="agg")

    if with_backend:
        try:
            # try to initialize the matplotlib backend as early as possible
            # (cuts down on potential for complex bugs)
            import matplotlib
            matplotlib.use(with_backend, force=True)
        except ModuleNotFoundError:
            # don't error out here when matplotlib is missing -- instead there will be
            # a failure within the notebook if notebook actually tries to use
            # matplotlib ...
            pass

        @magics_class
        class NotebookScripterMagics(Magics):
            @line_magic
            def matplotlib(self, _line):
                "Override matplotlib magic to use non-interactive backend regardless of user supplied argument ..."
                import matplotlib
                matplotlib.use(with_backend, force=True)

        unregister_magics = register_magic(shell, NotebookScripterMagics)

    # load the notebook object

    # create new module scope for notebook execution
    module_identity = "loaded_notebook"
    dynamic_module = types.ModuleType(module_identity)
    dynamic_module.__file__ = path_to_notebook
    dynamic_module.__dict__['get_ipython'] = get_ipython

    # do some extra work to ensure that magics that would affect the user_ns
    # actually affect the notebook module's ns
    save_user_ns = shell.user_ns
    shell.user_ns = dynamic_module.__dict__

    __add_parameter_frame(hooks)

    _, extension = os.path.splitext(path_to_notebook)
    if extension == ".ipynb":
        is_ipynb = True
    else:
        is_ipynb = False

    with io.open(path_to_notebook, 'r', encoding='utf-8') as f:
        if is_ipynb:
            notebook = read_notebook(f, 4)
        else:
            file_source = f.read()

    try:
        if is_ipynb:
            # execute ipynb notebook files
            for cell in notebook.cells:
                # loop over the code cells
                if cell.cell_type == 'code':
                    # transform the input to executable Python
                    code = shell.input_transformer_manager.transform_cell(
                        cell.source)

                    # run the code in the module
                    exec(code, dynamic_module.__dict__)

                    # # inject caller provided values into the module namespace after execution of any hook cells
                    # if 'metadata' in cell and 'NotebookScripterHookName' in cell.metadata:
                    #     hook_name = cell.metadata["NotebookScripterHookName"]
                    #     dynamic_module.__dict__.update(hooks.get(hook_name, {}))
        else:
            # execute .py files as notebooks
            code = shell.input_transformer_manager.transform_cell(file_source)

            # run the code in the module, compile first to provide source mapping support
            code_block = compile(code, path_to_notebook, 'exec')
            exec(code_block, dynamic_module.__dict__)
    finally:
        shell.user_ns = save_user_ns

        # revert the magics changes ...
        if unregister_magics:
            unregister_magics()

        # pop parameters stack
        __pop_parameter_frame()

    return dynamic_module


class NotebookScripterWrappedException(Exception):
    def __init__(self):
        exc_type, exc_value, exc_tb = sys.exc_info()
        self.exception = exc_value
        self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))

    def __str__(self):
        return '%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)


def worker(parent_to_child_queue, child_to_parent_queue, path_to_notebook, all_parent_parameters, **hooks):
    try:
        global __notebookscripter_injected__
        # at this point we are in a new spawned -- and __notebook_scripter_injected__ is equal to [{}, {}]
        # update it to hold the value passed in from the calling process
        __notebookscripter_injected__ = all_parent_parameters

        # then run the notebook
        dynamic_module = run_notebook(path_to_notebook, **hooks)

        # now get the return values requested by the caller from the module, serialize them, then pass them back to the calling process
        return_values = parent_to_child_queue.get()

        if return_values:
            ret = serialize_return_values(dynamic_module.__dict__, return_values)
            child_to_parent_queue.put((None, ret))
        else:
            child_to_parent_queue.put((None, {}))
    except Exception:
        # if an exception occurred -- wrap it up and pass it back to the calling process
        wrapped_exception = NotebookScripterWrappedException()
        child_to_parent_queue.put((wrapped_exception, None))
    # worker subprocess done -- if join() is called in parent process when this process's thread of execution has gotten here it will not block


def rehydrate(string_like):
    obj = str_to_obj(string_like)
    global __notebookscripter_injected__
    __notebookscripter_injected__ = obj


def dehydrate_return_values(namespace):
    names = __notebookscripter_injected__[-1][1].get("return_values", [])
    return obj_to_string_literal(serialize_return_values(namespace, names))


def obj_to_string_literal(obj):
    return codecs.encode(pickle.dumps(obj), "hex").strip()


def str_to_obj(str_value):
    return pickle.loads(codecs.decode(str_value, "hex"))


def serialize_return_values(namespace, names, to_string=False):
    obj = {k: simple_serialize(namespace[k]) for k in names if k in namespace}
    if to_string:
        obj = obj_to_string_literal(obj)
    return obj


def simple_serialize(obj):
    try:
        pickle.dumps(obj)
        # if we didn't raise, then (theoretically) obj should be serializable ...
        return obj
    except Exception:
        return repr(obj)


def run_notebook_in_process(
    path_to_notebook: str,
    **hooks
) -> None:
    """Asynchronously Run a notebook in a new subprocess.

    Returns a closure which when called will block until the execution has completed.

    Arguments passed to the closure will retrieve values from the subprocess and package them in an (anonymous) python module with the requested values retrieved from the exrecuted subprocess

    Args:
        path_to_notebook: Path to .ipynb or .py file containing notebook code
    Returns:
        Returns a closure which will block until the notebook execution completes then return a newly created (anonymous) python module
        populated with requested values retrieved from the subprocess
    """

    import multiprocessing as mp
    import atexit

    context = mp.get_context("spawn")
    child_to_parent_queue = context.Queue()
    parent_to_child_queue = context.Queue()

    p = context.Process(target=worker, args=(parent_to_child_queue, child_to_parent_queue, path_to_notebook, __notebookscripter_injected__), kwargs=hooks)
    p.start()

    def _terminate_when_parent_process_ends():
        p.terminate()
        p.join()

    atexit.register(_terminate_when_parent_process_ends)

    def _block_and_receive_results(*return_values):
        """
        Block until the notebook execution has completed.  Then retrieve return_values from the subprocess's module scope and return
        the newly created (anonymous) python module populated with the requested values retrieved from the subprocess


        Args:
            return_values: Optional list of strings to pass back from subprocess -- values matching these names in the module created by invoking the notebook in a subprocess will be serialized passed across process boundaries back to this process, deserialized and made part of the returned module
        """

        atexit.unregister(_terminate_when_parent_process_ends)

        parent_to_child_queue.put(return_values)

        module_identity = "loaded_notebook_from_subprocess"
        dynamic_module = types.ModuleType(module_identity)
        dynamic_module.__file__ = path_to_notebook

        err, final_namespace = child_to_parent_queue.get()
        p.join()

        if err:
            raise err

        # inject retrieved return values into the returned module namespace
        dynamic_module.__dict__.update(final_namespace)

        return dynamic_module

    return _block_and_receive_results


def run_notebook_in_jupyter(path_to_notebook: str,
                            **hooks
                            ) -> None:
    """Run a notebook via a jupyter ipython kernel 

    Returns a closure which when called will block until the execution has completed.

    *args Arguments passed to the closure will retrieve values from the executed kernel and package them in an (anonymous) python module with the requested values retrieved from the exrecuted subprocess.  
    The closure also accepts a save_output_notebook parameter which is none by default -- if provided it should be a file path where the executed notebook with computed output cells will be written.

    Args:
        path_to_notebook: Path to .ipynb or .py file containing notebook code
    Returns:
        Returns a closure which will block until the notebook execution completes then return a newly created (anonymous) python module
        populated with requested values retrieved from the subprocess
    """
    from nbconvert.preprocessors.execute import executenb
    from nbformat import write as write_notebook
    from nbformat.notebooknode import from_dict as notebook_node_from_dict

    from jupyter_client import KernelManager
    from .NotebookPyFileReader import read_pyfile_as_notebook

    _, extension = os.path.splitext(path_to_notebook)
    if extension == ".ipynb":
        is_ipynb = True
    else:
        is_ipynb = False

    with open(path_to_notebook, 'r') as f:
        if is_ipynb:
            notebook = read_notebook(f, 4)
        else:
            notebook = read_pyfile_as_notebook(path_to_notebook)

    def _block_and_receive_results(*return_values, save_output_notebook=None):

        # add an extra cell to beginning of notebook to populate parameters
        notebook_parameters = __notebookscripter_injected__ + [[hooks, {"return_values": return_values}]]
        base64_parameters = obj_to_string_literal(notebook_parameters)

        initialization_source = """from NotebookScripter import (rehydrate as __rehydrate__, dehydrate_return_values as __dehydrate_return_values__)
__rehydrate__({})""".format(base64_parameters)

        initialization_cell = notebook_node_from_dict({
            "cell_type": "code",
            "execution_count": 0,
            "metadata": {},
            "outputs": [],
            "source": initialization_source
        })

        finalization_source = """__dehydrate_return_values__(locals())"""

        finalization_cell = notebook_node_from_dict({
            "cell_type": "code",
            "execution_count": 0,
            "metadata": {},
            "outputs": [],
            "source": finalization_source})

        notebook['cells'].insert(0, initialization_cell)
        notebook['cells'].append(finalization_cell)

        km = KernelManager()
        # hack -- needed because the code within ExecutePreprocessor.start_kernel to start
        # the kernel when km hasn't started a kernel already can't possibly work
        km.start_kernel()
        executed_notebook = executenb(notebook, timeout=None, km=km)
        km.shutdown_kernel()

        if save_output_notebook:
            if isinstance(save_output_notebook, str):
                with open(save_output_notebook, 'w') as f:
                    write_notebook(executed_notebook, f)
            else:
                write_notebook(executed_notebook, save_output_notebook)

        encoded_return_values = eval(executed_notebook["cells"][-1]["outputs"][0]["data"]["text/plain"])
        final_namespace = str_to_obj(encoded_return_values)

        module_identity = "loaded_notebook_from_subprocess"
        dynamic_module = types.ModuleType(module_identity)
        dynamic_module.__file__ = path_to_notebook

        # inject retrieved return values into the returned module namespace
        dynamic_module.__dict__.update(final_namespace)
        return dynamic_module
    return _block_and_receive_results