errbotio/errbot

View on GitHub
tools/plugin-gen.py

Summary

Maintainability
B
4 hrs
Test Coverage
#!/usr/bin/env python3
import configparser
import json
import logging
import os
import pathlib
import sys
import time
from datetime import datetime

import requests
from requests.auth import HTTPBasicAuth

logging.basicConfig()

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

DEFAULT_AVATAR = "https://upload.wikimedia.org/wikipedia/commons/5/5f/Err-logo.png"
BLACKLISTED = []

user_cache = {}
plugins = {}


def get_auth():
    """Get auth creds from Github Token

    token is generated from the personal tokens in github
    """
    token_file = pathlib.Path("token")
    token_env = os.getenv("ERRBOT_REPOS_TOKEN")

    if token_file.is_file():
        try:
            token_info = open("token", "r").read()
        except ValueError:
            log.fatal(
                "Token file cannot be properly read, should be of the form username:token"
            )
            sys.exit(-1)
    elif token_env:
        token_info = token_env
    else:
        msg = "No 'token' file or environment variable 'ERROBOT_REPOS_TOKEN' found."
        log.fatal(msg)
        sys.exit(-1)

    try:
        user, token = token_info.strip().split(":")
    except ValueError:
        msg = "Token file cannot be properly read, should be of the form username:token"
        log.fatal(msg)
        sys.exit(-1)

    auth = HTTPBasicAuth(user, token)
    return auth


AUTH = get_auth()


def add_blacklisted(repo):
    with open("blacklisted.txt", "a") as f:
        f.write(repo)
        f.write("\n")


def save_plugins():
    with open("repos.json", "w") as f:
        json.dump(plugins, f, indent=2, separators=(",", ": "))


def get_avatar_url(repo):
    username = repo.split("/")[0]
    if username in user_cache:
        user = user_cache[username]
    else:
        user_res = requests.get("https://api.github.com/users/" + username, auth=AUTH)
        user = user_res.json()
        if "avatar_url" in user:  # don't pollute the presistent cache
            user_cache[username] = user
            with open("user_cache", "w") as f:
                f.write(repr(user_cache))
        rate_limit(user_res)
    return user["avatar_url"] if "avatar_url" in user else DEFAULT_AVATAR


def rate_limit(resp):
    """
    Wait enough to be in the budget for this request.
    :param resp: the http response from github
    :return:
    """
    if "X-RateLimit-Remaining" not in resp.headers:
        log.info("No rate limit detected. Hum along...")
        return
    remain = int(resp.headers["X-RateLimit-Remaining"])
    limit = int(resp.headers["X-RateLimit-Limit"])
    log.info("Rate limiter: %s allowed out of %d", remain, limit)
    if remain > 1:  # margin by one request
        return
    reset = int(resp.headers["X-RateLimit-Reset"])
    ts = datetime.fromtimestamp(reset)
    delay = (ts - datetime.now()).total_seconds()
    log.info("Hit rate limit. Have to wait for %d seconds", delay)
    if delay < 0:  # time drift
        delay = 2
    time.sleep(delay)


def parse_date(gh_date: str) -> datetime:
    return datetime.strptime(gh_date, "%Y-%m-%dT%H:%M:%SZ")


def check_repo(repo):
    repo_name = repo.get("full_name", None)
    if repo_name is None:
        log.error("No name in %s", repo)
    log.debug("Checking %s...", repo_name)
    code_resp = requests.get(
        "https://api.github.com/search/code?q=extension:plug+repo:%s" % repo_name,
        auth=AUTH,
    )
    if code_resp.status_code != 200:
        log.error(
            "Error getting https://api.github.com/search/code?q=extension:plug+repo:%s",
            repo_name,
        )
        log.error("code %d", code_resp.status_code)
        log.error("content %s", code_resp.text)

        return
    plug_items = code_resp.json()["items"]
    if not plug_items:
        log.debug("No plugin found in %s, blacklisting it.", repo_name)
        add_blacklisted(repo_name)
        return
    owner = repo["owner"]
    avatar_url = owner["avatar_url"] if "avatar_url" in owner else DEFAULT_AVATAR

    days_old = (datetime.now() - parse_date(repo["updated_at"])).days
    score = (
        repo["stargazers_count"]
        + repo["watchers_count"] * 2
        + repo["forks_count"]
        - days_old / 25
    )

    for plug in plug_items:
        plugfile_resp = requests.get(
            "https://raw.githubusercontent.com/%s/master/%s" % (repo_name, plug["path"])
        )
        log.debug("Found a plugin:")
        log.debug("Repo:  %s", repo_name)
        log.debug("File:  %s", plug["path"])
        parser = configparser.ConfigParser()
        try:
            parser.read_string(plugfile_resp.text)

            name = parser["Core"]["Name"]
            log.debug("Name: %s", name)

            if "Documentation" in parser and "Description" in parser["Documentation"]:
                doc = parser["Documentation"]["Description"]
                log.debug("Documentation: %s", doc)
            else:
                doc = ""

            if "Python" in parser:
                python = parser["Python"]["Version"]
                log.debug("Python Version: %s", python)
            else:
                python = "2"

            plugin = {
                "path": plug["path"],
                "repo": repo["html_url"],
                "documentation": doc,
                "name": name,
                "python": python,
                "avatar_url": avatar_url,
                "score": score,
            }

            repo_entry = plugins.get(repo_name, {})
            repo_entry[name] = plugin
            plugins[repo_name] = repo_entry
            log.debug("Catalog added plugin %s.", plugin["name"])
        except:
            log.error("Invalid syntax in %s, skipping...", plug["path"])
            continue

        rate_limit(plugfile_resp)

    save_plugins()
    rate_limit(code_resp)


def find_plugins(query):
    url = (
        "https://api.github.com/search/repositories?q=%s+in:name+language:python&sort=stars&order=desc"
        % query
    )
    while True:
        repo_resp = requests.get(url, auth=AUTH)
        repo_json = repo_resp.json()
        if repo_json.get("message", None) == "Bad credentials":
            log.error("Invalid credentials, check your token file, see README.")
            sys.exit(-1)
        log.debug(
            "Repo reqs before ratelimit %s/%s",
            repo_resp.headers["X-RateLimit-Remaining"],
            repo_resp.headers["X-RateLimit-Limit"],
        )
        if "message" in repo_json and repo_json["message"].startswith(
            "API rate limit exceeded for"
        ):
            log.error("API rate limit hit anyway ... wait for 30s")
            time.sleep(30)
            continue
        items = repo_json["items"]

        for repo in items:
            if repo["full_name"] in BLACKLISTED:
                log.debug("Skipping %s.", repo)
                continue
            check_repo(repo)
        if "next" not in repo_resp.links:
            break
        url = repo_resp.links["next"]["url"]
        log.debug("Next url: %s", url)
        rate_limit(repo_resp)


def main():
    find_plugins("err")
    # Those are found by global search only available on github UI:
    # https://github.com/search?l=&q=Documentation+extension%3Aplug&ref=advsearch&type=Code&utf8=%E2%9C%93
    url = "https://api.github.com/repos/%s"
    with open("extras.txt", "r") as extras:
        for repo_name in extras:
            repo_name = repo_name.strip()
            repo_resp = requests.get(url % repo_name, auth=AUTH)
            repo = repo_resp.json()
            if repo.get("message", None) == "Bad credentials":
                log.error("Invalid credentials, check your token file, see README.")
                sys.exit(-1)
            if "message" in repo and repo["message"].startswith(
                "API rate limit exceeded for"
            ):
                log.error("API rate limit hit anyway ... wait for 30s")
                time.sleep(30)
                continue
            if "message" in repo and repo["message"].startswith("Not Found"):
                log.error("%s not found.", repo_name)
            else:
                check_repo(repo)
            rate_limit(repo_resp)


if __name__ == "__main__":
    try:
        with open("user_cache", "r") as f:
            user_cache = eval(f.read())
    except FileNotFoundError:
        # File doesn't exist, so we continue on
        log.info("No user cache existing, will be generating it for the first time.")

    try:
        with open("blacklisted.txt", "r") as f:
            BLACKLISTED = [line.strip() for line in f.readlines()]
    except FileNotFoundError:
        log.info("No blacklisted.txt found, no plugins will be blacklisted.")

    main()