src/aiscalator/core/utils.py
# -*- coding: utf-8 -*-
# Apache Software License 2.0
#
# Copyright (c) 2018, Christophe Duong
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Various Utility functions
"""
import hashlib
import logging
import os
import re
import webbrowser
from pathlib import Path
from shlex import quote
from subprocess import PIPE # nosec
from subprocess import STDOUT
from subprocess import Popen
from threading import Thread
from time import sleep
from aiscalator.core.log_regex_analyzer import LogRegexAnalyzer
def data_file(path):
"""
Utility function to find resources data file packaged along with code
Parameters
----------
path : path
path to the resource file in the package
Returns
-------
absolute path to the resource data file
"""
return os.path.join(os.path.abspath(os.path.dirname(__file__)), path)
def find(collection, item, field='name'):
"""
Finds an element in a collection which has a field equal
to particular item value
Parameters
----------
collection : Set
Collection of objects
item
value of the item that we are looking for
field : string
Name of the field from the object to inspect
Returns
-------
object
Corresponding element that has a field matching item in
the collection
"""
for element in collection:
if element[field] == item:
return element
return None
def copy_replace(src, dst, pattern=None, replace_value=None):
"""
Copies a file from src to dst replacing pattern by replace_value
Parameters
----------
src : string
Path to the source filename to copy from
dst : string
Path to the output filename to copy to
pattern
list of Patterns to replace inside the src file
replace_value
list of Values to replace by in the dst file
"""
file1 = open(src, 'r') if isinstance(src, str) else src
file2 = open(dst, 'w') if isinstance(dst, str) else dst
pattern = (
[pattern] if isinstance(pattern, str)
else pattern
)
replace_value = (
[replace_value] if isinstance(replace_value, str)
else replace_value
)
if replace_value and pattern:
if len(replace_value) != len(pattern):
raise Exception("Invalid parameters: pattern and replace_value"
" have different sizes.")
rules = [
(re.compile(regex, re.IGNORECASE), value)
for regex, value in zip(pattern, replace_value)
]
else:
rules = []
for line in file1:
if rules:
for rule in rules:
line = re.sub(rule[0], rule[1], line)
file2.write(line)
if isinstance(src, str):
file1.close()
if isinstance(dst, str):
file2.close()
def log_info(pipe):
""" Default logging function """
logger = logging.getLogger(__name__)
for line in iter(pipe.readline, b''):
logger.debug(line.decode("utf-8"))
return True
class BackgroundThreadRunner():
"""
Worker Thread to run logging output in the background
...
Attributes
----------
_process :
Process object of the command running in the background
_log_function : function(stream -> bool)
callback function to log the output of the command
_no_redirect : bool
whether the subprocess STDOUT and STDERR should be redirected to logs
_worker : Thread
Thread object
"""
def __init__(self, command, log_function, no_redirect=False):
self._no_redirect = no_redirect
if no_redirect:
self._process = Popen(command) # nosec
else:
self._process = Popen(command, stdout=PIPE, stderr=STDOUT) # nosec
self._log_function = log_function
self._worker = Thread(name='worker', target=self.run)
self._worker.start()
def run(self):
"""
Starts the Thread, process the output of the process.
"""
if not self._no_redirect:
self._log_function(self._process.stdout)
def process(self):
"""Returns the process object."""
return self._process
def subprocess_run(command, log_function=log_info,
no_redirect=False, wait=True):
"""
Run command in a subprocess while redirecting output to log_function.
The subprocess either runs synchroneoulsy or in the background depending on
the wait parameter.
Parameters
----------
command : List
Command to run in the subprocess
log_function : function
Callback function to log the output of the subprocess
no_redirect : bool
whether the subprocess STDOUT and STDERR should be redirected to logs
wait : bool
Whether the subprocess should be run synchroneously or in
the background
Returns
-------
int
return code of the subprocess
BackgroundThreadRunner
the thread running in the background
"""
if wait:
if no_redirect:
process = Popen(command, shell=False) # nosec
else:
process = Popen(command,
stdout=PIPE,
stderr=STDOUT,
shell=False) # nosec
with process.stdout:
log_function(process.stdout)
return process.wait()
else:
return BackgroundThreadRunner(command, log_function, no_redirect)
def format_file_content(content, prefix="", suffix=""):
"""
Reformat the content of a file line by line, adding prefix and suffix
strings.
Parameters
----------
content : str
path to the file to reformat its content
prefix : str
add to each line this prefix string
suffix : str
add to each line this suffix string
Returns
-------
str
Formatted content of the file
"""
result = ""
with open(content, "r") as file:
for line in file:
# TODO handle comments
# TODO check validity of the line for extra security
result += prefix + quote(line.replace('\n', '')) + suffix
return result
def sha256(file: str):
"""
Reads a file content and returns its sha256 hash.
"""
sha = hashlib.sha256()
with open(file, "rb") as content:
for line in content:
sha.update(line)
return sha.hexdigest()
def wait_for_jupyter_lab(commands, logger, notebook, port, folder):
"""
Starts jupyter lab and wait for it to start, returning the url it's
running from.
Parameters
----------
commands: list
List of commands to run to start the process
logger : logging.Logger
Logger object
notebook : str
path to the notebook
port :
port on which the jupyter lab is listening
folder : str
path in the container to reach the notebook
Returns
-------
str
url from which it is serving the jupyter lab
"""
log = LogRegexAnalyzer(b'.*http://.*:8888/.token=([a-zA-Z0-9]+)(\r)?\n')
logger.info("Running...: %s", " ".join(commands))
subprocess_run(commands, log_function=log.grep_logs, wait=False)
for i in range(5):
sleep(2)
if log.artifact():
break
msg = "docker run does not seem to be up yet..."
msg += " retrying (%s/5)"
logger.warning(msg, i)
if log.artifact():
# TODO handle url better (not always localhost?)
url = ("http://localhost:" + str(port) +
"/lab/tree/" + folder + "/" +
notebook + "?token=" +
log.artifact())
logger.info("%s is up and running.", url)
# TODO --no-browser option
webbrowser.open(url)
return url
return ""
def check_notebook(logger, code_path, from_format="py:percent"):
"""
Checks existence of notebook file and regenerates using
jupytext from associated .py file if possible.
Otherwise, create an empty notebook file.
Parameters
----------
code_path : str
path to the notebook to check
from_format : str
jupytext format of the .py input file
"""
notebook, notebook_py = notebook_file(code_path, from_format)
# TODO: check if last modified date of notebook_py is behind notebook
# then refresh it
commands = [
"jupytext", "--from", from_format, "--to", "notebook",
notebook_py, "-o", notebook,
"--set-formats", ".ipynb," + from_format
]
if not os.path.exists(code_path):
code_path_dir = os.path.dirname(code_path)
if code_path_dir:
os.makedirs(code_path_dir, exist_ok=True)
copy_replace(data_file("../config/template/notebook.json"),
code_path,
pattern="__format__", replace_value=from_format)
logger.info("Running...: %s", " ".join(commands))
subprocess_run(commands)
if os.path.isfile(notebook_py):
logger.info("Running...: %s", " ".join(commands + ["--sync"]))
returncode = subprocess_run(commands + ["--sync"])
if returncode:
logger.warning("Failed to synchronize jupytext notebook,"
+ " regenerating it")
logger.info("Running...: %s", " ".join(commands))
subprocess_run(commands)
# touch notebook.py so jupytext doesn't complain when
# opening in the jupyter lab when the py is behind the
# ipynb in modification time
Path(notebook_py).touch()
def check_notebook_dir(logger, code_path, from_format="py:percent"):
"""
Check a folder and generate all notebook files that might
be required in that folder.
Parameters
----------
code_path : str
path to a file in the folder
from_format : str
jupytext format of potential .py files
"""
check_notebook(logger, code_path, from_format)
code_path_dir = os.path.dirname(code_path)
for file in os.listdir(code_path_dir):
file = os.path.join(code_path_dir, file)
notebook, notebook_py = notebook_file(file)
if notebook != code_path and notebook_py != code_path:
if (file.endswith(from_format.split(":")[0]) or
file.endswith(".ipynb")):
check_notebook(logger, notebook, from_format)
def notebook_file(code_path, from_format="py:percent"):
"""
Parse a path to return both the ipynb and py versions of
the file.
Parameters
----------
code_path : str
path to a file
from_format : str
jupytext format of potential .py files
Returns
-------
(str, str)
tuple of 2 paths to ipynb and py files
"""
if '.' in code_path:
base_code_path = os.path.splitext(os.path.basename(code_path))[0]
code_path_dir = os.path.dirname(code_path)
code_path = os.path.join(code_path_dir, base_code_path)
code_extension = from_format.split(":")[0]
return code_path + '.ipynb', code_path + '.' + code_extension