Aspidites/api/repl.py
# Aspidites
# Copyright (C) 2021 Ross J. Duff
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import pathlib
import shutil
import sys
from random import randint
import traceback
from itertools import cycle
import threading
import os
import re
import warnings
from contextlib import suppress
import time
from pathlib import Path
from traceback import print_exc
from typing import List, AnyStr, Union
import pyximport
pyximport.install()
from Cython.Compiler import Options
# noinspection PyUnresolvedReferences
from cython import (
declare as decl,
address as addr,
sizeof,
typeof,
struct,
cfunc,
ccall,
nogil,
no_gc,
inline,
union,
typedef,
cast,
char,
short,
int as cint,
bint,
short,
double,
long,
longdouble,
longdoublecomplex,
longlong,
complex,
float as cfloat,
)
from Aspidites._vendor.pyrsistent import (
pset as __pset,
pmap as __pmap,
s,
v,
)
from Aspidites._vendor.contracts.library.extensions import Extension
from Aspidites._vendor.pyparsing import ParseException, ParseResults
# noinspection PyUnresolvedReferences
from cmath import inf
# noinspection PyUnresolvedReferences
import Aspidites.api.parser
from Aspidites.api.compiler import Compiler, CompilerArgs
try:
import readline
except ImportError:
readline = None
histfile = Path("~/.woma_shell_history").expanduser()
histfile_size = 1000
START_PROMPT = ">>> "
CONTINUE_PROMPT = "... "
class Spinner: # pragma: no cover
busy = False
delay = 0.25
def __init__(self, delay=None, stdout=sys.stdout):
self.spinner_generator = (i for i in cycle("|/-\\"))
self.stdout = stdout
if delay and float(delay):
self.delay = delay
def spinner_task(self):
while self.busy:
self.stdout.write(next(self.spinner_generator))
self.stdout.flush()
time.sleep(self.delay)
self.stdout.write("\b")
self.stdout.flush()
def __enter__(self):
self.busy = True
threading.Thread(target=self.spinner_task).start()
def __exit__(self, exception, value, tb):
self.busy = False
time.sleep(self.delay)
if exception is not None:
return False
single_arg_help = re.compile(r"(?:help[\(])(\w+)(?:[\)])")
cy_opt = v(
"annotate",
"annotate_coverage_xml",
"buffer_max_dims",
"cache_builtins",
"cimport_from_pyx",
"clear_to_none",
"closure_freelist_size",
"convert_range",
"docstrings",
"embed_pos_in_docstring",
"generate_cleanup_code",
"fast_fail",
"warning_errors",
"error_on_unknown_names",
"error_on_uninitialized",
"gcc_branch_hints",
"lookup_module_cpdef",
"embed",
)
cy_kwargs = dict(zip(cy_opt, map(lambda x: getattr(Options, x), cy_opt)))
class Help:
doc_leader = ""
doc_header = "Documented commands (type help <topic>):"
misc_header = "Miscellaneous help topics:"
undoc_header = "Undocumented commands:"
nohelp = "*** No help on %s"
ruler = "┉"
def __init__(self, parent):
"""
>>> h = Help(self)
>>> h('help')
"""
self.parent = parent
self.names = dir(self.parent.__class__)
self.stdout = self.parent.stdout
self.cmds_doc = []
self.cmds_undoc = []
def __call__(self, arg):
if arg:
try:
func = getattr(self.parent, "help_" + arg)
except AttributeError:
try:
doc = getattr(self.parent, "do_" + arg).__doc__
if doc:
self.stdout.write("%s\n" % str(doc))
return
except AttributeError:
pass
self.stdout.write("%s\n" % str(self.nohelp % (arg,)))
return
func()
else:
self.help = {}
for name in self.names:
if name[:5] == "help_":
self.help[name[5:]] = 1
self.names.sort()
# There can be duplicates if routines overridden
self.handle_names()
self.stdout.write("%s\n" % str(self.doc_leader))
self.print_topics(self.doc_header, self.cmds_doc, 15, 80)
self.print_topics(self.misc_header, list(self.help.keys()), 15, 80)
self.print_topics(self.undoc_header, self.cmds_undoc, 15, 80)
def handle_names(self):
prevname = ""
for name in self.names:
if name[:3] == "do_":
if name == prevname:
continue
prevname = name
cmd = name[3:]
if cmd in self.help:
self.cmds_doc.append(cmd)
del self.help[cmd]
elif getattr(self.parent, name).__doc__:
self.cmds_doc.append(cmd)
else:
self.cmds_undoc.append(cmd)
def print_topics(self, header, cmds, cmdlen, maxcol):
if cmds:
self.stdout.write("%s\n" % str(header))
if self.ruler:
self.stdout.write("╭%s╮\n" % str(self.ruler * len(header)))
self.columnize(cmds, maxcol - 1)
self.stdout.write("\n")
def columnize(self, list, displaywidth=80):
"""Display a list of strings as a compact set of columns.
Each column is only as wide as necessary.
Columns are separated by two spaces (one was not legible enough).
"""
if not list:
self.stdout.write("<empty>\n")
return
nonstrings = [i for i in range(len(list)) if not isinstance(list[i], str)]
if nonstrings:
raise TypeError(
"list[i] not a string for i in %s" % ", ".join(map(str, nonstrings))
)
size = len(list)
if size == 1:
self.stdout.write(" %s\n" % str(list[0]))
return
# Try every row count from 1 upwards
for nrows in range(1, len(list)):
ncols = (size + nrows - 1) // nrows
colwidths = []
totwidth = -2
for col in range(ncols):
colwidth = 0
for row in range(nrows):
i = row + nrows * col
if i >= size:
break
x = list[i]
colwidth = max(colwidth, len(x))
colwidths.append(colwidth)
totwidth += colwidth + 2
if totwidth > displaywidth:
break
if totwidth <= displaywidth:
break
else:
nrows = len(list)
ncols = 1
colwidths = [0]
for row in range(nrows):
texts = []
for col in range(ncols):
i = row + nrows * col
if i >= size:
x = ""
else:
x = list[i]
texts.append(x)
while texts and not texts[-1]:
del texts[-1]
for col in range(len(texts)):
texts[col] = texts[col].ljust(colwidths[col])
self.stdout.write(" %s\n" % str(" ".join(texts)))
class ReadEvalParse: # pragma: no cover
intro = (
"Welcome to the Woma Interactive Shell. Use the 'help()' or '?' command to see a list of commands.\nThis is experimental and mainly aims to help developers to sandbox "
"Woma without compilation."
)
_globals = globals().copy()
def __init__(self, stdout=None):
if stdout is not None:
self.stdout = stdout
else:
self.stdout = sys.stdout
self.warn = lambda x: sys.stderr.write(x) and sys.stderr.flush()
self.tmpdir = pathlib.Path("tmp")
self.__locals__ = dict(locals(), **globals())
def input(self):
self.stdout.flush()
line = input(START_PROMPT)
if "))" in line and line.endswith(tuple(Extension.registrar.keys()) + ("*",)):
line += "\n "
while True:
line2 = input(CONTINUE_PROMPT)
if "(!)" in line2:
line += line2 + "\n "
else:
line += line2 + "\n "
if line.endswith("\n \n "):
break
return line
def get_names(self):
# This method used to pull in base class attributes
# at a time dir() didn't do it yet.
return dir(self.__class__)
def find_token(self, token: str, text: str) -> bool:
return text.find(token) != -1
def displayhook(self, text):
if text is None:
return
try:
self.stdout.write(str(text))
except UnicodeEncodeError:
bytes = text.encode(sys.stdout.encoding, "backslashreplace")
if hasattr(self.stdout, "buffer"):
self.stdout.buffer.write(bytes)
else:
text = bytes.decode(sys.stdout.encoding, "strict")
self.stdout.write(text)
self.stdout.write("\n")
def eval_exec(self, x: Union[List, AnyStr]):
# noinspection PyBroadException
if isinstance(x, ParseResults):
x = x[0]
# noinspection PyBroadException
warnings.resetwarnings()
try:
out = eval(
compile(x, filename="<inline code>", mode="eval"),
self.__locals__,
self.__locals__,
)
except Exception:
code = compile(x, filename="<inline code>", mode="exec")
out = exec(code, self.__locals__, self.__locals__)
# if out is not None:
# print(out)
self.displayhook(out)
def preloop(self):
if readline and Path(histfile).exists():
readline.read_history_file(histfile)
with contextlib.suppress(FileExistsError):
self.tmpdir.mkdir()
def postloop(self):
if readline:
readline.set_history_length(histfile_size)
readline.write_history_file(histfile)
def loop(self) -> None:
os.system("cls" if os.name == "nt" else "clear")
self.stdout.write(self.intro + "\n")
count = 0
try:
while True:
self.postloop()
self.preloop()
try:
line = self.input()
if line == "":
continue
if line == "exit()":
raise SystemExit
if self.find_token("?", line):
self.do_help(line.lstrip("? "))
continue
if self.find_token("help ", line):
self.do_help(line.split(" ")[1])
continue
if single_arg_help.match(line):
self.do_help(single_arg_help.search(line).group(1))
continue
if hasattr(self, "do_" + line):
getattr(self, "do_" + line)()
continue
if str(line).isidentifier():
self.eval_exec(line)
continue
try:
p = Aspidites.api.parser.parse_module(line)
except ParseException:
self.warn(
f'Warning: Failed to parse "{line}" as Woma.\n'
f"Remember that Woma does not allow literal evaluation, try assigning to a variable.\n"
f"Falling back to python with suppressed exceptions.\n"
)
with suppress(Exception):
self.eval_exec(line)
continue
else:
num = randint(1, 10000000000000000000)
file = self.tmpdir / f"module{num}.pyx"
args = CompilerArgs(
fname=file,
code=p,
force=True,
bytecode=False,
c=False,
build_requires="",
verbose=0,
**cy_kwargs,
)
with suppress(ResourceWarning):
Compiler(args)
with suppress(Exception):
try:
module = __import__(
f"tmp.module{num}", locals=globals(), fromlist=["*"]
)
all_names = [
name
for name in dir(module)
if not name.startswith("_")
]
self.__locals__.update(
{name: getattr(module, name) for name in all_names}
)
except Exception:
self.eval_exec(p)
continue
except Exception as e:
self.stdout.write(f"Error: {e}\n")
traceback.print_tb(e.__traceback__)
shutil.rmtree(self.tmpdir)
continue
except KeyboardInterrupt as e:
self.do_exit()
def do_help(self, arg=None):
h = Help(self)
h(arg)
def do_exit(self, arg=None):
"""Exit the woma interactive interpreter."""
shutil.rmtree(self.tmpdir)
self.stdout.write("\nExiting...")
raise SystemExit
def do_copyright(self):
"""Copyright Ross J. Duff 2021 licensed under the GNU Public License v3."""
pass
def do_locals(self, arg=None):
"""Print local variables"""
hidden = ["self", "stdout", "ReadEvalParse", "__warningregistry__"]
visible = dict()
for k, v in self.__locals__.items():
if k in self._globals.keys() or k in hidden:
continue
else:
visible[k] = v
from Aspidites.api.api import _format_locals
self.stdout.write(_format_locals(visible).decode("UTF-8"))
def do_flush(self):
"""forcibly flush stdout"""
self.stdout.flush()
if __name__ == "__main__":
rep = ReadEvalParse()
rep.loop()