firehol/netdata

View on GitHub
.github/scripts/netdata-pkgcloud-cleanup.py

Summary

Maintainability
B
4 hrs
Test Coverage
#!/bin/env python3

import requests
from requests.auth import HTTPBasicAuth
from datetime import date, datetime, timedelta
import os
import sys
import argparse
from pprint import pprint
from datetime import datetime
from dateutil import parser


class PackageCloud:
    NUM_PACKAGE_MINOR_TO_KEEP = 5
    NUM_RETENTION_DAYS = 30
    # number of pages to process. Use '0' to process all
    MAX_PAGES = 0

    def __init__(self, repo_type, dry_run=True, auth_token=None):
        self.headers = {
            "Accept" : "application/json",
            "Content-Type" : "application/json",
        }
        self.dry_run = dry_run
        self.repo_type = repo_type
        if repo_type == "stable":
            repo = "netdata/netdata"
        elif repo_type == "devel":
            repo = "netdata/netdata-devel"
        elif repo_type == "edge":
            repo = "netdata/netdata-edge"
        else:
            print(f"ERROR: unknown repo type '{repo_type}'!\nAccepted values are: stable,devel,edge")
            sys.exit(1)
        self.base_url = f"https://packagecloud.io/api/v1/repos/{repo}"
        self.auth = HTTPBasicAuth(username=auth_token, password='') if auth_token else None

    def get_all_packages(self):
        page = 1
        all_pkg_list = []
        while True:
            url = f"{self.base_url}/packages.json?page={page}"
            if page > self.MAX_PAGES and self.MAX_PAGES != 0:
                break
            else:
                pkg_list = requests.get(url, auth=self.auth, headers=self.headers).json()
                if len(pkg_list) == 0:
                    break
                else:
                    print(f"Processing page: {page}")
                    for element in pkg_list:
                        self.is_pkg_older_than_days(element, 30)
                        if element['name'] != 'netdata-repo' and element['name'] != 'netdata-repo-edge':
                            all_pkg_list.append(element)
            page += 1
        return all_pkg_list

    def delete_package(self, destroy_url):
        if self.dry_run:
            print(f"         - DRY_RUN mode. Not deleting package '{destroy_url}'.")
        else:
            print(f"         - Deleting package: {destroy_url}")
            url = f"https://packagecloud.io{destroy_url}"
            response = requests.delete(url, auth=self.auth, headers=self.headers).json()
            response = None
            if not response:
                print(f"           Package deleted successfully.")
            else:
                print(f"           Failed deleting package!")

    def get_destroy_url(self, pkg_url):
        url = f"https://packagecloud.io{pkg_url}"
        response = requests.get(url, auth=self.auth, headers=self.headers)
        response.raise_for_status()
        return response.json()['destroy_url']

    def get_packages_for_distro(self, distro, all_pkg_list):
        distro_pkg_list = [ pkg for pkg in all_pkg_list if pkg['distro_version'] == distro ]
        return distro_pkg_list

    def get_packages_for_arch(self, arch, all_pkg_list):
        arch_pkg_list = [ pkg for pkg in all_pkg_list if pkg['package_url'].split('/')[11] == arch ]
        return arch_pkg_list

    def get_arches(self, pkg_list):
        arches = list(set([pkg['package_url'].split('/')[11] for pkg in pkg_list ]))
        return arches

    def get_pkg_list(self, pkg_name, pkg_list):
        filtered_list = [ pkg for pkg in pkg_list if pkg['name'] == pkg_name ]
        return filtered_list

    def get_minor_versions(self, all_versions):
        minor_versions = ['.'.join(version.split('.')[:-1]) for version in all_versions ]
        minor_versions = list(set(minor_versions))
        minor_versions.sort()
        return minor_versions

    def is_pkg_older_than_days(self, pkg, num_days):
        pkg_create_date = datetime.strptime(pkg['created_at'], '%Y-%m-%dT%H:%M:%S.%fZ')
        time_difference = datetime.now() - pkg_create_date
        return time_difference.days > num_days

    def cleanup_repo(self):
        if self.repo_type == 'stable':
            self.cleanup_stable_repo()
        else:
            self.cleanup_edge_repo()

    def cleanup_edge_repo(self):
        all_pkg_list = self.get_all_packages()
        pkgs_to_delete = []
        pkgs_to_keep = []
        for package in all_pkg_list:
            if self.is_pkg_older_than_days(package, self.NUM_RETENTION_DAYS):
                pkgs_to_delete.append(package)
            else:
                pkgs_to_keep.append(package)
        print(f"Keeping the following packages (newer than {self.NUM_RETENTION_DAYS} days):")
        for pkg in pkgs_to_keep:
            print(f"    > pkg: {pkg['package_html_url']} / created_at: {pkg['created_at']}")
        print(f"Deleting the following packages (older than {self.NUM_RETENTION_DAYS} days):")
        for pkg in pkgs_to_delete:
            print(f"    > pkg: {pkg['package_html_url']} / created_at: {pkg['created_at']}")
            self.delete_package(pkg['destroy_url'])

    def cleanup_stable_repo(self):
        all_pkg_list = self.get_all_packages()
        all_distros = list(set([ pkg['distro_version'] for pkg in all_pkg_list ]))
        all_distros = sorted(all_distros)
        print(f"<> Distributions list: {all_distros}")

        for distro in all_distros:
            print(f">> Processing distro: {distro}")
            pkg_list_distro = self.get_packages_for_distro(distro, all_pkg_list)
            arches = self.get_arches(pkg_list_distro)
            print(f"   <> Arch list: {arches}")
            for arch in arches:
                print(f"   >> Processing arch: {distro} -> {arch}")
                pkg_list_arch = self.get_packages_for_arch(arch, pkg_list_distro)
                pkg_names = [pkg['name'] for pkg in pkg_list_arch]
                pkg_names = list(set(pkg_names))
                print(f"     <> Package names: {pkg_names}")
                for pkg_name in pkg_names:
                    print(f"     >> Processing package: {distro} -> {arch} -> {pkg_name}")
                    pkg_list = self.get_pkg_list(pkg_name, pkg_list_arch)
                    pkg_versions = [pkg['version'] for pkg in pkg_list]
                    pkg_minor_versions = self.get_minor_versions(pkg_versions)
                    pkg_minor_to_keep = pkg_minor_versions[-self.NUM_PACKAGE_MINOR_TO_KEEP:]
                    print(f"       <> Minor Package Versions to Keep: {pkg_minor_to_keep}")
                    pkg_minor_to_delete = list(set(pkg_minor_versions) - set(pkg_minor_to_keep))
                    print(f"       <> Minor Package Versions to Delete: {pkg_minor_to_delete}")
                    urls_to_keep = [pkg['package_url'] for pkg in pkg_list if '.'.join(pkg['version'].split('.')[:-1]) in pkg_minor_to_keep]
                    urls_to_delete = [pkg['package_url'] for pkg in pkg_list if '.'.join(pkg['version'].split('.')[:-1]) in pkg_minor_to_delete]
                    for pkg_url in urls_to_delete:
                        destroy_url = self.get_destroy_url(pkg_url)
                        self.delete_package(destroy_url)


def configure():
    parser = argparse.ArgumentParser()
    parser.add_argument('--repo-type', '-r', required=True,
                        help='Repository type against to perform cleanup')
    parser.add_argument('--dry-run', '-d', action='store_true',
                        help='Dry-run Mode')
    args = parser.parse_args()
    try:
        token = os.environ['PKGCLOUD_TOKEN']
    except Exception as e:
        print(f"FATAL: 'PKGCLOUD_TOKEN' environment variable is not set!", file=sys.stderr)
        sys.exit(1)
    repo_type = args.repo_type
    dry_run = args.dry_run
    conf = {
        'repo_type': args.repo_type,
        'dry_run': args.dry_run,
        'token': token
    }
    return conf


def main():
    config = configure()
    pkg_cloud = PackageCloud(config['repo_type'], config['dry_run'], config['token'])
    pkg_cloud.cleanup_repo()


if __name__ == "__main__":
    main()