suoto/hdl_checker

View on GitHub
hdl_checker/utils.py

Summary

Maintainability
C
1 day
Test Coverage
# This file is part of HDL Checker.
#
# Copyright (c) 2015 - 2019 suoto (Andre Souto)
#
# HDL Checker is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# HDL Checker is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with HDL Checker.  If not, see <http://www.gnu.org/licenses/>.
"Common stuff"

import abc
import functools
import inspect
import logging
import os
import os.path as p
import pprint
import re
import shutil
import signal
import subprocess as subp
import sys
import threading
from collections import Counter
from tempfile import NamedTemporaryFile
from threading import Timer
from typing import Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union

import six

_logger = logging.getLogger(__name__)

ON_WINDOWS = os.name == "nt"
ON_LINUX = sys.platform == "linux"
ON_MAC = sys.platform == "darwin"


def setupLogging(stream, level):  # pragma: no cover
    "Setup logging according to the command line parameters"
    if isinstance(stream, six.string_types):
        _stream = open(stream, "a")
    else:
        _stream = stream

    handler = logging.StreamHandler(_stream)
    handler.formatter = logging.Formatter(
        "%(levelname)-7s | %(asctime)s | "
        + "%(name)s @ %(funcName)s():%(lineno)d %(threadName)s "
        + "|\t%(message)s",
        datefmt="%H:%M:%S",
    )

    logging.root.addHandler(handler)
    logging.root.setLevel(level)

    logging.getLogger("urllib3").setLevel(logging.WARNING)
    logging.getLogger("pynvim").setLevel(logging.WARNING)
    logging.getLogger("matplotlib").setLevel(logging.INFO)


# From here: http://stackoverflow.com/a/8536476/1672783
def terminateProcess(pid):
    "Terminate a process given its PID"

    if ON_WINDOWS:
        import ctypes  # pylint: disable=import-outside-toplevel

        process_terminate = 1
        handle = ctypes.windll.kernel32.OpenProcess(process_terminate, False, pid)
        ctypes.windll.kernel32.TerminateProcess(handle, -1)
        ctypes.windll.kernel32.CloseHandle(handle)
    else:
        os.kill(pid, signal.SIGTERM)


def isProcessRunning(pid):
    "Checks if a process is running given its PID"

    if ON_WINDOWS:
        return _isProcessRunningOnWindows(pid)

    return _isProcessRunningOnPosix(pid)


def _isProcessRunningOnPosix(pid):
    "Checks if a given PID is runnning under POSIX OSs"
    try:
        os.kill(pid, 0)
    except OSError:
        return False

    return True


def _isProcessRunningOnWindows(pid):
    """
    Enumerates active processes as seen under windows Task Manager on Win
    NT/2k/XP using PSAPI.dll (new api for processes) and using ctypes.Use it as
    you please.

    Based on information from
    http://support.microsoft.com/default.aspx?scid=KB;EN-US;Q175030&ID=KB;EN-US;Q175030

    By Eric Koome email ekoome@yahoo.com
    license GPL

    (adapted from code found at
    http://code.activestate.com/recipes/305279-getting-process-information-on-windows/)
    """
    from ctypes import (  # pylint: disable=import-outside-toplevel
        windll,
        c_ulong,
        sizeof,
        byref,
    )

    # PSAPI.DLL
    psapi = windll.psapi

    arr = c_ulong * 256
    list_of_pids = arr()
    cb = sizeof(list_of_pids)  # pylint: disable=invalid-name
    cb_needed = c_ulong()

    # Call Enumprocesses to get hold of process id's
    psapi.EnumProcesses(byref(list_of_pids), cb, byref(cb_needed))

    # Number of processes returned
    number_of_pids = int(cb_needed.value / sizeof(c_ulong()))

    pid_list = list(list_of_pids)[:number_of_pids]

    return int(pid) in pid_list


if not hasattr(p, "samefile"):

    def _samefile(file1, file2):
        """
        Emulated version of os.path.samefile. This is needed for Python
        2.7 running on Windows (at least on Appveyor CI)
        """

        return os.stat(file1) == os.stat(file2)


else:
    _samefile = p.samefile  # pylint: disable=invalid-name

samefile = _samefile  # pylint: disable=invalid-name


def removeDuplicates(seq):
    """
    Fast removal of duplicates within an iterable
    """
    seen = set()
    seen_add = seen.add

    return [x for x in seq if not (x in seen or seen_add(x))]


# Copied from ycmd
def toBytes(value):  # pragma: no cover
    """
    Consistently returns the new bytes() type from python-future.
    Assumes incoming strings are either UTF-8 or unicode (which is
    converted to UTF-8).
    """

    if not value:
        return bytes()

    # This is tricky. On py2, the bytes type from builtins (from python-future) is
    # a subclass of str. So all of the following are true:
    #   isinstance(str(), bytes)
    #   isinstance(bytes(), str)
    # But they don't behave the same in one important aspect: iterating over a
    # bytes instance yields ints, while iterating over a (raw, py2) str yields
    # chars. We want consistent behavior so we force the use of bytes().

    if isinstance(value, bytes):
        return value

    # This is meant to catch Python 2's native str type.

    if isinstance(value, bytes):
        return bytes(value, encoding="utf8")

    if isinstance(value, str):
        # On py2, with `from builtins import *` imported, the following is true:
        #
        #   bytes(str(u'abc'), 'utf8') == b"b'abc'"
        #
        # Obviously this is a bug in python-future. So we work around it. Also filed
        # upstream at: https://github.com/PythonCharmers/python-future/issues/193
        # We can't just return value.encode('utf8') on both py2 & py3 because on
        # py2 that *sometimes* returns the built-in str type instead of the newbytes
        # type from python-future.

        if six.PY2:
            return bytes(value.encode("utf8"), encoding="utf8")

        return bytes(value, encoding="utf8")

    # This is meant to catch `int` and similar non-string/bytes types.

    return toBytes(str(value))


def getTemporaryFilename(name):
    """
    Gets a temporary filename following the format 'hdl_checker_pid<>.log' on Linux
    and 'hdl_checker_pid<>_<unique>.log' on Windows
    """
    try:
        name, suffix = name.split(".")
    except ValueError:
        suffix = None

    basename = "hdl_checker_" + name + "_pid{}".format(os.getpid())

    if ON_WINDOWS:
        return NamedTemporaryFile(
            prefix=basename + "_", suffix="." + (suffix or "log"), delete=False
        ).name

    return p.join(p.sep, "tmp", basename + "." + (suffix or "log"))


def isFileReadable(path):
    # type: (str) -> bool
    """
    Checks if a given file is readable
    """
    try:
        open(path, "r").close()

        return True
    except IOError:
        return False


def runShellCommand(cmd_with_args, shell=False, env=None, cwd=None):
    # type: (Union[Tuple[str], List[str]], bool, Optional[Dict], Optional[str]) -> Iterable[str]
    """
    Runs a shell command and handles stdout catching
    """
    _logger.debug(" ".join(cmd_with_args))

    try:
        return (
            subp.check_output(
                cmd_with_args,
                stderr=subp.STDOUT,
                shell=shell,
                env=env or os.environ,
                cwd=cwd,
            )
            .decode(errors="replace")
            .splitlines()
        )
    except subp.CalledProcessError as exc:
        stdout = tuple(exc.output.decode(errors="replace").splitlines())
        _logger.debug(
            "Command '%s' failed with error code %d.\nStdout:\n%s",
            cmd_with_args,
            exc.returncode,
            "\n".join(stdout),
        )
        return stdout
    except OSError as exc:
        _logger.debug("Command '%s' failed with %s", cmd_with_args, exc)
        raise


def removeIfExists(filename):
    # type: (str) -> bool
    "Removes filename using os.remove and catches the exception if that fails"
    try:
        os.remove(filename)
        _logger.debug("Removed %s", filename)
        return True
    except OSError:
        _logger.debug("Failed to remove %s", filename)
        return False


def removeDirIfExists(dirname):
    # type: (str) -> bool
    """
    Removes the directory dirname using shutil.rmtree and catches the exception
    if that fails
    """
    try:
        shutil.rmtree(dirname)
        _logger.debug("Removed %s", dirname)
        return True
    except OSError:
        _logger.debug("Failed to remove %s", dirname)
        return False


class HashableByKey(object):  # pylint: disable=useless-object-inheritance
    """
    Implements hash and comparison operators properly across Python 2 and 3
    """

    __metaclass__ = abc.ABCMeta

    @property
    @abc.abstractmethod
    def __hash_key__(self):
        """ Implement this attribute to use it for hashing and comparing"""

    def __hash__(self):
        try:
            return hash(self.__hash_key__)
        except:  # pragma: no cover
            print("Couldn't hash %s" % repr(self.__hash_key__))
            raise

    def __eq__(self, other):
        """Overrides the default implementation"""

        if isinstance(other, self.__class__):
            return self.__hash_key__ == other.__hash_key__

        return NotImplemented  # pragma: no cover

    def __ne__(self, other):  # pragma: no cover
        """Overrides the default implementation (unnecessary in Python 3)"""
        result = self.__eq__(other)

        if result is not NotImplemented:
            return not result

        return NotImplemented


def logCalls(func):  # pragma: no cover
    # type: (Callable) -> Callable
    "Decorator to Log calls to func"

    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        # type: (...) -> Callable
        _str = "%s(%s, %s)" % (func.__name__, args, pprint.pformat(kwargs))
        try:
            result = func(self, *args, **kwargs)
            _logger.debug("%s => %s", _str, repr(result))

            return result
        except:
            _logger.exception("Failed to run %s", _str)
            raise

    return wrapper


T = TypeVar("T")  # pylint: disable=invalid-name


def getMostCommonItem(items):
    # type: (Iterable[T]) -> T
    """
    Gets the most common item on an interable of items
    """
    data = Counter(items)
    return max(items, key=data.get)


def readFile(path):
    "Wrapper around open().read() that return \n for new lines"
    return open(path, mode="r", newline="\n", errors="replace").read()


REPO_URL = "https://github.com/suoto/hdl_checker"
_TAGS = re.compile(r"^\w+\s+refs\/tags\/v(?P<tag>(?:\d+\.){2}\d+)", flags=re.MULTILINE)

VersionFormat = Tuple[int, ...]


def _getLatestReleaseVersion():
    # type: () -> Optional[VersionFormat]
    """
    Return the latest tag from https://github.com/suoto/hdl_checker, striping
    the leading 'v' (so that v1.0.0 becomes simply 1.0.0). If the connection to
    the URL fails, return None
    """
    proc = subp.Popen(
        ["git", "ls-remote", "--tags", REPO_URL],
        env={"GIT_TERMINAL_PROMPT": "0"},
        stdout=subp.PIPE,
        stderr=subp.PIPE,
    )

    timer = Timer(5, proc.kill)
    timer.start()

    try:
        stdout, stderr = proc.communicate()
    finally:
        timer.cancel()

    if not stdout or stderr:
        _logger.info(
            "Couldn't fetch latest tag from %s: '%s'", REPO_URL, stderr.decode()
        )
        return None

    tags = tuple(
        tuple(int(x) for x in tag.split(".")) for tag in _TAGS.findall(stdout.decode())
    )

    if tags:
        return sorted(tags)[-1]

    _logger.warning("Unable to get version from '%s'", stdout)
    return None


_VERSION_FORMAT = re.compile(r"^\d+\.\d+\.\d+$")


def onNewReleaseFound(func):
    # type: (Callable[[str], None]) -> None
    """
    Checks if a new release is out and calls func if the running an older
    version
    """
    from hdl_checker import (  # pylint: disable=import-outside-toplevel
        __version__ as current,
    )

    # When installing via pip from github, versioneer will report the current
    # version as 0+unknown, in which case we won't notify
    if not _VERSION_FORMAT.match(current):
        return

    latest = _getLatestReleaseVersion()

    if not latest:
        return

    _logger.debug("Current version is %s, latest is %s", current, latest)

    if latest > tuple(int(x) for x in current.split(".")):
        func(
            "HDL Checker version {} is out! (current version is {})".format(
                ".".join(map(str, latest)), current
            )
        )


ENABLE_DEBOUNCE = True

# Copied from pyls (see
# https://github.com/palantir/python-language-server/blob/d81c7ba14d54b8e52192b0e00cbb4dacbb6f414d/pyls/_utils.py#L22-L47)
def debounce(interval_s, keyed_by=None):
    """Debounce calls to this function until interval_s seconds have passed."""

    def wrapper(func):
        timers = {}
        lock = threading.Lock()

        @functools.wraps(func)
        def debounced(*args, **kwargs):
            key = (
                inspect.signature(func).bind(*args, **kwargs).arguments[keyed_by]
                if keyed_by
                else None
            )

            # Could not find a way to disable debouncing whilist testing, so
            # we'll use a module var
            if not ENABLE_DEBOUNCE:
                return func(*args, **kwargs)

            def run():
                with lock:
                    del timers[key]
                return func(*args, **kwargs)

            with lock:
                old_timer = timers.get(key)
                if old_timer:
                    old_timer.cancel()

                timer = threading.Timer(interval_s, run)
                timers[key] = timer
                timer.start()

        return debounced

    return wrapper