whylabs/whylogs-python

View on GitHub
python/whylogs/api/usage_stats/__init__.py

Summary

Maintainability
A
0 mins
Test Coverage
import atexit
import hashlib
import http.client
import json
import logging
import os
import site
import socket
import sys
import uuid
from datetime import datetime
from threading import Thread
from typing import Any, Dict, List, Optional
from urllib import request

import whylogs

_TELEMETRY_ENDPOINT = "https://stats.whylogs.com/"
if os.getenv("TELEMETRY_DEV"):
    _TELEMETRY_ENDPOINT = "https://staging-stats.whylogs.com"
_TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
logger = logging.getLogger(__name__)

ANALYTICS_OPT_OUT = "WHYLOGS_NO_ANALYTICS"

# Flag to disable it internally
_TELEMETRY_DISABLED = False
_TRACKED_EVENTS: Dict[str, bool] = {}
_SITE_PACKAGES: List[str] = []

try:
    # fix for virtualenv lack of definition for getsitepackages
    if hasattr(site, "getsitepackages"):
        _SITE_PACKAGES = site.getsitepackages()
    else:
        from distutils.sysconfig import get_python_lib

        _SITE_PACKAGES = [get_python_lib()]
except:  # noqa
    logger.debug("Encountered exception when checking site packages")

if os.getenv(ANALYTICS_OPT_OUT) is not None:
    logger.debug("Opted out of usage statistics. Skipping.")
    _TELEMETRY_DISABLED = True

try:
    if os.path.exists(os.path.expanduser("~/.whylogs/disable_telemetry")):
        _TELEMETRY_DISABLED = True
except:  # noqa
    logger.info("Encounter exception when checking file system. Disable telemetry by default")
    _TELEMETRY_DISABLED = True


def emit_usage(event: str) -> None:
    global _TELEMETRY_DISABLED
    global _TRACKED_EVENTS
    if _TELEMETRY_DISABLED:
        return
    if _TRACKED_EVENTS.get(event):
        return
    _TRACKED_EVENTS[event] = True

    t = Thread(target=_do_emit_usage, args=(event,))
    t.start()

    atexit.register(t.join)


_metadata = None
_identity = None


def _do_emit_usage(event: str) -> None:
    global _TELEMETRY_DISABLED
    if _TELEMETRY_DISABLED:
        logger.debug("Opted out of usage statistics. Skipping.")
        return

    logger.debug("Telemetry opted in. Emitting usage statistics")

    global _identity
    global _metadata
    if _identity is None:
        _identity = _calc_identity()
    if _metadata is None:
        _metadata = _build_metadata()

    _send_stats_event(event, _identity, _metadata)


def _calc_identity() -> str:
    try:
        hashed_computer_name = hashlib.sha512(bytes(socket.gethostname(), encoding="utf8"))
        return hashed_computer_name.hexdigest()
    except socket.timeout as exc:
        logger.debug(
            "Socket timeout when trying to get the computer name. Exception: %s",
            exc,
        )
        return uuid.uuid4().hex


def _build_metadata() -> Dict[str, Any]:
    """Hash system and project data to send to our stats endpoint."""

    if hasattr(whylogs, "__version__"):
        project_version = whylogs.__version__
    else:
        import whylogs as why

        project_version = why.package_version()
    (major, minor, macro, _, _) = sys.version_info

    metadata = {
        "project_version": project_version,
        "python_version": f"{major}.{minor}.{macro}",
        "python_version_full": sys.version,
        "terminal": _get_terminal_mode(),
        "os": sys.platform,
        "conda": ("CONDA_DEFAULT_ENV" in os.environ),
        "venv": ("VIRTUAL_ENV" in os.environ),
        "environment": _get_environment(),
    }

    # track various integrations
    integrations = {
        "numpy": _has_lib("numpy"),
        "pandas": _has_lib("pandas"),
        "mlflow": _has_lib("mlflow"),
        "dask": _has_lib("dask"),
        "ray": _has_lib("ray"),
        "airflow": _has_lib("airflow"),
        "pyspark": _has_lib("pyspark"),
        "flyte": _has_lib("flyte"),
        "kafka": _has_lib("kafka"),
        "langkit": _has_lib("langkit"),
    }
    for k in list(integrations.keys()):
        if integrations.get(k) is False:
            integrations.pop(k)

    # add integration metadata
    metadata.update(integrations)
    return metadata


def _send_stats_event(event_name: str, identity: str, properties: Optional[Dict[str, Any]] = None) -> None:
    data = {
        "identity": identity,
        "event": event_name,
        "timestamp": datetime.utcnow().strftime(_TIMESTAMP_FORMAT),
        "properties": properties or {},
    }
    global _TELEMETRY_DISABLED
    json_data = json.dumps(data).encode()
    req = request.Request(_TELEMETRY_ENDPOINT, data=json_data, method="POST")
    req.add_header("Content-Type", "application/json")

    resp: http.client.HTTPResponse = None  # type: ignore
    try:
        resp = request.urlopen(req, timeout=3)
        if resp.status != 200:
            logger.info("Unable to send usage stats. Disabling whylogs api usage collection.")
            _TELEMETRY_DISABLED = True
        logger.debug("Response: %s", resp.read())
    except:  # noqa
        logger.info("Connection error. Skip whylogs api usage collection.")
        _TELEMETRY_DISABLED = True

    finally:
        if resp is not None:
            resp.close()


def _get_terminal_mode() -> str:
    try:
        from IPython.core.getipython import get_ipython  # type: ignore

        ipython = get_ipython()
        if ipython is not None:
            return ipython.__class__.__name__
    except:  # noqa
        pass

    if hasattr(sys, "ps1"):
        return "shell"
    else:
        return "headless"


def _get_environment() -> str:
    environments_dict = {
        "GITHUB_ACTION": "github_action",
        "GITLAB_CI": "gitlab_ci",
        "BINDER_PORT": "binder",
        "PYCHARM_HOSTED": "pycharm",
        "SM_CURRENT_HOST": "sagemaker",
        "DATABRICKS_RUNTIME_VERSION": "databricks",
        "COLAB_GPU": "colab",
        "KAGGLE_KERNEL_RUN_TYPE": "kaggle",
        "DEEPNOTE_PROJECT_ID": "deepnote",
    }

    for key, value in environments_dict.items():
        if key in os.environ:
            return value
    return "unknown"


def _has_lib(lib_name: str) -> bool:
    try:
        for p in _SITE_PACKAGES:
            if os.path.exists(os.path.join(p, lib_name)):
                return True
    except:  # noqa
        pass

    return False