src/skjold/sources/pyup.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
import datetime
import json
import os
import urllib.request
from collections import defaultdict
from typing import Any, Callable, Dict, List, Tuple, Union

from packaging import specifiers
from packaging.utils import NormalizedName, canonicalize_name
from packaging.version import Version

from skjold.core import Dependency, SecurityAdvisory, SecurityAdvisorySource
from skjold.tasks import register_source


class PyUpSecurityAdvisory(SecurityAdvisory):
    _json: Dict[str, str]

    @classmethod
    def using(cls, name: str, json_: dict) -> "PyUpSecurityAdvisory":
        obj = cls()
        obj._json = json_
        obj._json["name"] = name
        return obj

    @property
    def identifier(self) -> str:
        return self._json["cve"]

    @property
    def source(self) -> str:
        return "pyup"

    @property
    def severity(self) -> str:
        return "UNKNOWN"

    @property
    def url(self) -> str:
        path = self._json.get("more_info_path")
        if not path:
            return ""
        return f"https://pyup.io{path}"

    @property
    def references(self) -> List[str]:
        return []

    @property
    def package_name(self) -> str:
        return self._json["name"]

    @property
    def canonical_name(self) -> NormalizedName:
        return canonicalize_name(self.package_name)

    @property
    def summary(self) -> str:
        return self._json["advisory"]

    @property
    def vulnerable_version_range(self) -> List[specifiers.SpecifierSet]:
        return [
            specifiers.SpecifierSet(v, prereleases=True) for v in self._json["specs"]
        ]

    @property
    def vulnerable_versions(self) -> str:
        return ",".join([str(x) for x in self.vulnerable_version_range])

    def is_affected(self, version: str) -> bool:
        version_ = Version(version)
        allows_: Callable[[specifiers.SpecifierSet], bool] = (
            lambda x: True if version_ in x else False
        )
        # affected_versions = map(lambda x: x.allows(version), self.vulnerable_version_range)
        affected_versions = map(allows_, self.vulnerable_version_range)
        return any(affected_versions)


class PyUp(SecurityAdvisorySource):

    _url: str = "https://raw.githubusercontent.com/pyupio/safety-db/master/data/insecure_full.json"
    _name: str = "pyup"
    _metadata: Dict[str, Union[str, int]] = {}

    @property
    def name(self) -> str:
        return self._name

    @property
    def total_count(self) -> int:
        return len(self._advisories.keys())

    @property
    def path(self) -> str:
        return os.path.join(self._cache_dir, "pyup.cache")

    @property
    def last_updated_at(self) -> datetime.datetime:
        timestamp = int(self._metadata["timestamp"])
        return datetime.datetime.utcfromtimestamp(timestamp)

    def _load_cache(self) -> Any:
        with open(self.path, "rb") as fh:
            json_ = json.load(fh)
            return json_

    def populate_from_cache(self) -> None:
        self._advisories = defaultdict(list)

        cache_ = self._load_cache()
        for package_name, advisories in cache_.items():
            if package_name in {"$meta"}:
                self._metadata = advisories
                continue

            for advisory in advisories:
                obj = PyUpSecurityAdvisory.using(package_name, advisory)
                self._advisories[obj.canonical_name].append(obj)

    def update(self) -> None:
        request_ = urllib.request.Request(
            url=self._url,
            headers={"Accept": "application/json"},
        )
        with urllib.request.urlopen(request_) as response:
            json_ = json.loads(response.read())

        with open(self.path, "w") as fh:
            json.dump(json_, fh)

    def has_security_advisory_for(self, dependency: Dependency) -> bool:
        return dependency.canonical_name in self.advisories.keys()

    def is_vulnerable_package(
        self, dependency: Dependency
    ) -> Tuple[bool, List[SecurityAdvisory]]:
        advisories = []
        for candidate in self.advisories[dependency.canonical_name]:
            if candidate.is_affected(dependency.version):
                advisories.append(candidate)

        return len(advisories) > 0, advisories


register_source("pyup", PyUp)