piccolbo/Pweave

View on GitHub
pweave/processors/jupyter.py

Summary

Maintainability
B
6 hrs
Test Coverage
# -*- coding: utf-8 -*-

from jupyter_client.manager import start_new_kernel
from jupyter_client import KernelManager
from nbformat.v4 import output_from_msg
import os

from .. import config
from .base import PwebProcessorBase
from . import subsnippets
from IPython.core import inputsplitter
from ipykernel.inprocess import InProcessKernelManager

from queue import Empty


class JupyterProcessor(PwebProcessorBase):
    """Generic Jupyter processor, should work with any kernel"""

    def __init__(
        self, parsed, kernel, source, mode, figdir, outdir, embed_kernel=False
    ):
        super(JupyterProcessor, self).__init__(
            parsed, kernel, source, mode, figdir, outdir
        )

        self.extra_arguments = None
        self.timeout = -1
        path = os.path.abspath(outdir)

        if embed_kernel:
            km = InProcessKernelManager(kernel_name=kernel)
        else:
            km = KernelManager(kernel_name=kernel)

        km.start_kernel(cwd=path, stderr=open(os.devnull, "w"))
        kc = km.client()
        kc.start_channels()
        try:
            kc.wait_for_ready()
        except RuntimeError:
            print(
                "Timeout from starting kernel\nTry restarting python session and running weave again"
            )
            kc.stop_channels()
            km.shutdown_kernel()
            raise

        self.km = km
        self.kc = kc
        self.kc.allow_stdin = False

    def close(self):
        self.kc.stop_channels()
        self.km.shutdown_kernel()

    def run_cell(self, src):
        cell = {}
        cell["source"] = src.lstrip()
        msg_id = self.kc.execute(src.lstrip(), store_history=False)

        # wait for finish, with timeout
        while True:
            try:
                timeout = self.timeout
                if timeout < 0:
                    timeout = None
                msg = self.kc.get_shell_msg(timeout=timeout)
            except Empty:
                if self.interrupt_on_timeout:
                    self.km.interrupt_kernel()
                    break
                else:
                    try:
                        exception = TimeoutError
                    except NameError:
                        exception = RuntimeError
                    raise exception("Cell execution timed out, see log for details.")

            if msg["parent_header"].get("msg_id") == msg_id:
                break
            else:
                # not our reply
                continue

        outs = []

        while True:
            try:
                # We've already waited for execute_reply, so all output
                # should already be waiting. However, on slow networks, like
                # in certain CI systems, waiting < 1 second might miss messages.
                # So long as the kernel sends a status:idle message when it
                # finishes, we won't actually have to wait this long, anyway.
                msg = self.kc.iopub_channel.get_msg(block=True, timeout=4)
            except Empty:
                print(
                    "Timeout waiting for IOPub output\nTry restarting python session and running weave again"
                )
                raise RuntimeError("Timeout waiting for IOPub output")

            # stdout from InProcessKernelManager has no parent_header
            if (
                msg["parent_header"].get("msg_id") != msg_id
                and msg["msg_type"] != "stream"
            ):
                continue

            msg_type = msg["msg_type"]
            content = msg["content"]

            # set the prompt number for the input and the output
            if "execution_count" in content:
                cell["execution_count"] = content["execution_count"]

            if msg_type == "status":
                if content["execution_state"] == "idle":
                    break
                else:
                    continue
            elif msg_type == "execute_input":
                continue
            elif msg_type == "clear_output":
                outs = []
                continue
            elif msg_type.startswith("comm"):
                continue

            try:
                out = output_from_msg(msg)
            except ValueError:
                print("unhandled iopub msg: " + msg_type)
            else:
                outs.append(out)

        return outs

    def loadstring(self, code_str, **kwargs):
        return self.run_cell(code_str)

    # Yes same format for compatibility even if term is not implemented
    def loadterm(self, code_str, **kwargs):
        return (sources, self.run_cell(code_str))

    # TODO add support for "rich" output
    # Requires storing the results for formatter
    def load_inline_string(self, code_string):
        from nbconvert import filters

        outputs = self.loadstring(code_string)
        result = ""
        for out in outputs:
            if out["output_type"] == "stream":
                result += out["text"]
            elif out["output_type"] == "error":
                result += filters.strip_ansi("".join(out["traceback"]))
            elif "text/plain" in out["data"]:
                result += out["data"]["text/plain"]
            else:
                result = ""
        return result


class IPythonProcessor(JupyterProcessor):
    """Contains IPython specific functions"""

    def __init__(self, *args):
        kernel = args[1]

        if kernel == "python3":
            embed = True
        else:
            embed = False

        super(IPythonProcessor, self).__init__(*args, embed_kernel=embed)
        if config.rcParams["usematplotlib"]:
            self.init_matplotlib()

    def init_matplotlib(self):
        self.loadstring(subsnippets.init_matplotlib)

    def pre_run_hook(self, chunk):
        f_size = (
            """matplotlib.rcParams.update({"figure.figsize" : (%i, %i)})"""
            % chunk["f_size"]
        )
        f_dpi = """matplotlib.rcParams.update({"figure.dpi" : %i})""" % chunk["dpi"]
        self.loadstring("\n".join([f_size, f_dpi]))

    def loadterm(self, code_str, **kwargs):
        splitter = inputsplitter.IPythonInputSplitter()
        code_lines = code_str.lstrip().splitlines()
        sources = []
        outputs = []

        for line in code_lines:
            if splitter.push_accepts_more():
                splitter.push(line)
            else:
                code_str = splitter.source
                sources.append(code_str)
                out = self.loadstring(code_str)
                # print(out)
                outputs.append(out)
                splitter.reset()
                splitter.push(line)

        if splitter.source != "":
            code_str = splitter.source
            sources.append(code_str)
            out = self.loadstring(code_str)
            outputs.append(out)

        return (sources, outputs)