piccolbo/Pweave

View on GitHub
pweave/readers.py

Summary

Maintainability
F
6 days
Test Coverage
# Pweave readers
import re
import copy
import json
import io
from subprocess import Popen, PIPE
import os
from urllib import request, parse


def read_file_or_url(source):
    """
    Try to open path as a file, and if its fails open it as url.
    """
    try:
        codefile = io.open(source, "r", encoding="utf-8")
        contents = codefile.read()
        codefile.close()
    except IOError:
        r = request.urlopen(source)
        contents = r.read().decode("utf-8")
        r.close()

    return contents


class PwebReader(object):
    """Reads and parses Pweb documents"""

    # regex that matches beginning of code block
    code_begin = r"^<<(.*?)>>=\s*$"
    doc_begin = r"^@$"

    def __init__(self, file=None, string=None):
        self.source = file

        # Get input from string or
        if file is not None:
            self.rawtext = read_file_or_url(self.source)
        else:
            self.rawtext = string
        self.state = "doc"  # Initial state of document

    def getparsed(self):
        return copy.deepcopy(self.parsed)

    def count_emptylines(self, line):
        """Counts empty lines for parser, the result is stored in self.n_emptylines"""
        if line.strip() == "":
            self.n_emptylines += 1
        else:
            self.n_emptylines = 0

    def codestart(self, line):
        if not re.match(self.code_begin, line):
            return False, True
        else:
            return True, True

    def docstart(self, line):
        if not re.match(self.doc_begin, line.strip()):
            return False, True
        else:
            return True, True

    def parse(self):
        lines = self.rawtext.splitlines()

        read = ""
        chunks = []
        codeN = 1
        docN = 1
        opts = {"option_string": ""}
        self.n_emptylines = 0
        self.lineNo = 0

        for line in lines:
            self.lineNo += 1
            (code_starts, skip) = self.codestart(line)
            if code_starts and self.state != "code":
                self.state = "code"
                opts = self.getoptions(line)
                chunks.append(
                    {
                        "type": "doc",
                        "content": read,
                        "number": docN,
                        "start_line": self.lineNo,
                    }
                )
                docN += 1
                read = ""
                if skip:
                    continue  # Don't append options code

            (doc_starts, skip) = self.docstart(line)
            if doc_starts and self.state == "code":
                self.state = "doc"
                if (
                    read.strip() != "" or "source" in opts
                ):  # Don't parse empty chunks unless source is specified
                    chunks.append(
                        {
                            "type": "code",
                            "content": "\n" + read.rstrip(),
                            "number": codeN,
                            "options": opts,
                            "start_line": self.lineNo,
                        }
                    )
                codeN += 1
                read = ""
                if skip:
                    continue

            if self.state == "doc":
                if hasattr(self, "strip_comments"):
                    line = self.strip_comments(line)

            read += line + "\n"
            self.count_emptylines(line)

        # Handle the last chunk
        if self.state == "code":
            chunks.append(
                {
                    "type": "code",
                    "content": "\n" + read.rstrip(),
                    "number": codeN,
                    "options": opts,
                    "start_line": self.lineNo,
                }
            )
        if self.state == "doc":
            chunks.append({"type": "doc", "content": read, "number": docN})
        self.parsed = chunks

    def getoptions(self, line):
        # Aliases for False and True to conform with Sweave syntax
        FALSE = False
        TRUE = True

        # Parse options from chunk to a dictionary
        # optstring = opt.replace('<<', '').replace('>>=', '').strip()
        optstring = re.findall(self.code_begin, line)[0]
        if not optstring.strip():
            return {"option_string": ""}
        # First option can be a name/label
        if optstring.split(",")[0].find("=") == -1:
            splitted = optstring.split(",")
            splitted[0] = 'name = "%s"' % splitted[0]
            optstring = ",".join(splitted)

        opt_scope = {}
        exec("chunkoptions =  dict(" + optstring + ")", opt_scope)
        chunkoptions = opt_scope["chunkoptions"]
        chunkoptions["option_string"] = optstring

        if "label" in chunkoptions:
            chunkoptions["name"] = chunkoptions["label"]

        return chunkoptions


class PwebMarkdownReader(PwebReader):
    def __init__(self, file=None, string=None):
        PwebReader.__init__(self, file, string)
        self.code_begin = r"^[`~]{3,}(?:\{|\{\.|)python(?:;|,|)\s*(.*?)(?:\}|\s*)$"
        self.doc_begin = r"^(`|~){3,}\s*$"


class PwebScriptReader(object):
    """Read scripts to Pweave"""

    doc_line = r"(^#'.*)|(^#%%.*)|(^# %%.*)"
    doc_start = r"(^#')|(^#%%)|(^# %%)"

    opt_line = r"(^#\+.*$)|(^#%%\+.*$)|(^# %%\+.*$)"
    opt_start = r"(^#\+)|(^#%%\+)|(^# %%\+)"

    def __init__(self, file=None, string=None):
        self.source = file

        # Get input from string or
        if file is not None:
            self.rawtext = read_file_or_url(self.source)
        else:
            self.rawtext = string
        self.state = "code"  # Initial state of document

    def getparsed(self):
        return copy.deepcopy(self.parsed)

    def count_emptylines(self, line):
        """Counts empty lines for parser, the result is stored in self.n_emptylines"""
        if line.strip() == "":
            self.n_emptylines += 1
        else:
            self.n_emptylines = 0

    def parse(self):
        lines = self.rawtext.splitlines()

        read = ""
        chunks = []
        codeN = 1
        docN = 1
        opts = {"option_string": ""}
        self.n_emptylines = 0
        self.lineNo = 0
        start_line = 1

        for line in lines:
            self.lineNo += 1
            if re.match(self.doc_line, line) and not re.match(self.opt_line, line):
                # line = line.replace("#' ", "", 1) #Need to fix with general!
                line = re.sub(self.doc_start, "", line, 1)
                if line.startswith(" "):
                    line = line.replace(" ", "", 1)
                if self.state == "code" and read.strip() != "":
                    chunks.append(
                        {
                            "type": "code",
                            "content": "\n" + read.rstrip(),
                            "number": codeN,
                            "options": opts,
                            "start_line": start_line,
                        }
                    )
                    codeN += 1
                    read = ""
                    start_line = self.lineNo
                self.state = "doc"
            elif re.match(self.opt_line, line):
                start_line = self.lineNo
                if self.state == "code" and read.strip() != "":
                    chunks.append(
                        {
                            "type": "code",
                            "content": "\n" + read.rstrip(),
                            "number": codeN,
                            "options": opts,
                            "start_line": start_line,
                        }
                    )
                    read = ""
                    codeN += 1
                if self.state == "doc" and read.strip() != "":
                    if docN > 1:
                        read = (
                            "\n" + read
                        )  # Add whitespace to doc chunk. Needed for markdown output
                    chunks.append(
                        {
                            "type": "doc",
                            "content": read,
                            "number": docN,
                            "start_line": start_line,
                        }
                    )
                    read = ""
                    docN += 1
                opts = self.getoptions(line)
                self.state = "code"
                continue
            elif self.state == "doc" and line.strip() != "" and read.strip() != "":
                self.state = "code"
                if docN > 1:
                    read = (
                        "\n" + read
                    )  # Add whitespace to doc chunk. Needed for markdown output
                chunks.append(
                    {
                        "type": "doc",
                        "content": read,
                        "number": docN,
                        "start_line": start_line,
                    }
                )
                opts = {"option_string": ""}
                start_line = self.lineNo
                read = ""
                docN += 1

            read += line + "\n"
            self.count_emptylines(line)

        # Handle the last chunk
        if self.state == "code":
            chunks.append(
                {
                    "type": "code",
                    "content": "\n" + read.rstrip(),
                    "number": codeN,
                    "options": opts,
                    "start_line": start_line,
                }
            )
        if self.state == "doc":
            chunks.append(
                {
                    "type": "doc",
                    "content": read,
                    "number": docN,
                    "start_line": start_line,
                }
            )
        self.parsed = chunks

    def getoptions(self, line):
        # Aliases for False and True to conform with Sweave syntax
        FALSE = False
        TRUE = True
        # Parse options from chunk to a dictionary
        optstring = re.sub(self.opt_start, "", line, 1)
        # optstring = opt.replace('#+', '', 1).strip()
        if optstring == "":
            return {"option_string": ""}
        # First option can be a name/label
        if optstring.split(",")[0].find("=") == -1:
            splitted = optstring.split(",")
            splitted[0] = 'name = "%s"' % splitted[0]
            optstring = ",".join(splitted)

        opt_scope = {}
        exec("chunkoptions =  dict(" + optstring + ")", opt_scope)
        chunkoptions = opt_scope["chunkoptions"]
        chunkoptions["option_string"] = optstring
        # Update the defaults

        if "label" in chunkoptions:
            chunkoptions["name"] = chunkoptions["label"]

        return chunkoptions


class PwebNBReader(object):
    """Read IPython notebooks"""

    def __init__(self, file=None, string=None):
        self.source = file
        self.parsed = []
        self.NB = json.loads(io.open(file, encoding="utf-8").read())

    def parse(self):
        docN = 1
        codeN = 1
        doc = self.NB["worksheets"][0]["cells"]

        for cell in doc:
            if cell["cell_type"] == "code":
                self.parsed.append(
                    {
                        "type": "code",
                        "content": "\n" + "".join(cell["input"]),
                        "options": {},
                        "number": codeN,
                    }
                )
                codeN += 1
            else:
                self.parsed.append(
                    {
                        "type": "doc",
                        "content": "\n" + "".join(cell["source"]),
                        "options": {},
                        "number": docN,
                    }
                )
                docN += 1

    def getparsed(self):
        return copy.deepcopy(self.parsed)


class PwebReaders(object):
    """Lists available input formats"""

    formats = {
        "noweb": {"class": PwebReader, "description": "Noweb document"},
        "script": {
            "class": PwebScriptReader,
            "description": "Python script with rogyxen markup",
        },
        "markdown": {"class": PwebMarkdownReader, "description": "Markdown document"},
        "notebook": {"class": PwebNBReader, "description": "IPython notebook"},
    }

    @classmethod
    def guess_reader(cls, filename):
        """Returns reader based on file extension"""
        _, ext = os.path.splitext(filename)
        ext = ext.lower()

        if ext.endswith("w"):
            return cls.get_reader("noweb")
        if "md" in ext:
            return cls.get_reader("markdown")

        # Script reader is the default, because in should be
        # able to read .py, *.jl, .R etc Jupyter supported formats
        return cls.get_reader("script")

    @classmethod
    def get_reader(cls, informat):
        """Get a reader based on reader name"""
        return cls.formats[informat]["class"]

    @classmethod
    def shortformats(cls):
        fmtstring = ""
        names = list(cls.formats.keys())
        n = len(names)
        for i in range(n):
            fmtstring += " %s" % (names[i])
            if i < (n - 1):
                fmtstring += ","

        return fmtstring

    @classmethod
    def getformats(cls):
        fmtstring = ""
        for format in sorted(cls.formats):
            fmtstring += "* %s:\n   %s\n" % (format, cls.formats[format]["description"])
        return fmtstring

    @classmethod
    def listformats(cls):
        print("\nPweave supported input formats:\n")
        print(cls.getformats())
        print("More info: http://mpastell.com/pweave/ \n")


class PwebConvert(object):
    """Convert from one input format to another"""

    def __init__(
        self, file=None, informat="script", outformat="noweb", pandoc_args=None
    ):
        self.informat = informat
        self.outformat = outformat

        self.doc = PwebReaders.formats[informat]["class"](file)

        self.pandoc_args = pandoc_args
        if self.informat == self.outformat:
            self.basename = re.split("\.+[^\.]+$", file)[0] + "_converted"
        else:
            self.basename = re.split("\.+[^\.]+$", file)[0]
        self.doc.parse()

    def format_docchunk(self, content):
        """Format doc chunks for output"""
        if self.pandoc_args is not None:
            pandoc = Popen(
                ["pandoc"] + self.pandoc_args.split(), stdin=PIPE, stdout=PIPE
            )
            pandoc.stdin.write(content.encode("utf-8"))
            content = (pandoc.communicate()[0]).decode("utf-8").replace("\r", "") + "\n"

        if self.outformat == "noweb":
            return content
        if self.outformat == "script":
            lines = content.splitlines()
            flines = [("#' " + x) for x in lines]
            return "\n".join(flines)

    def write(self):
        if self.outformat == "noweb":
            ext = ".Pnw"
        if self.outformat == "script":
            ext = ".py"
        file = self.basename + ext
        f = open(file, "w")
        f.write(self.converted)
        f.close()
        print("Output written to " + file)

    def convert(self):
        output = []

        if self.outformat == "noweb":
            code = "<<%s>>=%s\n@\n"
        if self.outformat == "script":
            code = "#+ %s\n%s\n"

        for chunk in self.doc.parsed:
            if chunk["type"] == "doc":
                output.append(self.format_docchunk(chunk["content"]))
            if chunk["type"] == "code":
                optstring = chunk["options"]["option_string"]
                output.append(code % (optstring, chunk["content"]))

        self.converted = "\n".join(output)


class PwebNBConvert(object):
    """Convert to IPython Notebook"""

    def __init__(
        self, file=None, informat="script", outformat="noweb", pandoc_args=None
    ):
        self.informat = informat
        self.outformat = outformat
        self.ext = ".ipynb"

        self.doc = PwebReaders.formats[informat]["class"](file)

        self.pandoc_args = pandoc_args
        if self.informat == self.outformat:
            self.basename = re.split("\.+[^\.]+$", file)[0] + "_converted"
        else:
            self.basename = re.split("\.+[^\.]+$", file)[0]
        self.doc.parse()

    def format_docchunk(self, content):
        """Format doc chunks for output.

        If self.pandoc_args is None, the docchunk will not be converted.
        """
        if self.pandoc_args is not None:
            pandoc = Popen(
                ["pandoc"] + self.pandoc_args.split(), stdin=PIPE, stdout=PIPE
            )
            pandoc.stdin.write(content)
            content = (pandoc.communicate()[0]).replace("\r", "") + "\n"
        return content

    def write(self):
        file = self.basename + self.ext
        f = open(file, "w")
        f.write(self.converted)
        f.close()
        print("Output written to " + file)

    def convert(self):
        from nbformat.v3 import (
            new_notebook,
            new_worksheet,
            new_code_cell,
            new_text_cell,
            writes_json,
        )

        ws = new_worksheet()

        for chunk in self.doc.parsed:
            if chunk["type"] == "doc":
                # TODO: this relies on pandoc converting into
                # markdown
                fmt = u"markdown"
                doc = self.format_docchunk(chunk["content"])
                ws.cells.append(new_text_cell(fmt, source=doc))
            if chunk["type"] == "code":
                lang = u"python"
                code = chunk["content"]
                ws.cells.append(new_code_cell(input=code, language=lang))

        NB = new_notebook(name="Pweaved ipython notebook", worksheets=[ws])

        self.converted = writes_json(NB)


class PwebConverters(object):
    """Lists available input / output formats"""

    formats = {
        "noweb": {"class": PwebConvert, "description": "Noweb document"},
        "script": {"class": PwebConvert, "description": "Script format"},
        "notebook": {"class": PwebNBConvert, "description": "IPython notebook"},
    }

    @classmethod
    def shortformats(cls):
        fmtstring = ""
        names = cls.formats.keys()
        n = len(names)
        for i in range(n):
            fmtstring += " %s" % (names[i])
            if i < (n - 1):
                fmtstring += ","

        return fmtstring

    @classmethod
    def getformats(cls):
        fmtstring = ""
        for format in sorted(cls.formats):
            fmtstring += "* %s:\n   %s\n" % (format, cls.formats[format]["description"])
        return fmtstring

    @classmethod
    def listformats(cls):
        print("\nPweave supported conversion formats:\n")
        print(cls.getformats())