airbnb/caravel

View on GitHub
scripts/cancel_github_workflows.py

Summary

Maintainability
A
1 hr
Test Coverage
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Manually cancel previous GitHub Action workflow runs in queue.

Example:
  # Set up
  export GITHUB_TOKEN={{ your personal github access token }}
  export GITHUB_REPOSITORY=apache/superset

  # cancel previous jobs for a PR, will even cancel the running ones
  ./cancel_github_workflows.py 1042

  # cancel previous jobs for a branch
  ./cancel_github_workflows.py my-branch

  # cancel all jobs of a PR, including the latest runs
  ./cancel_github_workflows.py 1024 --include-last
"""

import os
from collections.abc import Iterable, Iterator
from typing import Any, Literal, Optional, Union

import click
import requests
from click.exceptions import ClickException
from dateutil import parser

github_token = os.environ.get("GITHUB_TOKEN")
github_repo = os.environ.get("GITHUB_REPOSITORY", "apache/superset")


def request(
    method: Literal["GET", "POST", "DELETE", "PUT"], endpoint: str, **kwargs: Any
) -> dict[str, Any]:
    resp = requests.request(
        method,
        f"https://api.github.com/{endpoint.lstrip('/')}",
        headers={"Authorization": f"Bearer {github_token}"},
        **kwargs,
    ).json()
    if "message" in resp:
        raise ClickException(f"{endpoint} >> {resp['message']} <<")
    return resp


def list_runs(
    repo: str,
    params: Optional[dict[str, str]] = None,
) -> Iterator[dict[str, Any]]:
    """List all github workflow runs.
    Returns:
      An iterator that will iterate through all pages of matching runs."""
    if params is None:
        params = {}
    page = 1
    total_count = 10000
    while page * 100 < total_count:
        result = request(
            "GET",
            f"/repos/{repo}/actions/runs",
            params={**params, "per_page": 100, "page": page},
        )
        total_count = result["total_count"]
        yield from result["workflow_runs"]
        page += 1


def cancel_run(repo: str, run_id: Union[str, int]) -> dict[str, Any]:
    return request("POST", f"/repos/{repo}/actions/runs/{run_id}/cancel")


def get_pull_request(repo: str, pull_number: Union[str, int]) -> dict[str, Any]:
    return request("GET", f"/repos/{repo}/pulls/{pull_number}")


def get_runs(
    repo: str,
    branch: Optional[str] = None,
    user: Optional[str] = None,
    statuses: Iterable[str] = ("queued", "in_progress"),
    events: Iterable[str] = ("pull_request", "push"),
) -> list[dict[str, Any]]:
    """Get workflow runs associated with the given branch"""
    return [
        item
        for event in events
        for status in statuses
        for item in list_runs(repo, {"event": event, "status": status})
        if (branch is None or (branch == item["head_branch"]))
        and (user is None or (user == item["head_repository"]["owner"]["login"]))
    ]


def print_commit(commit: dict[str, Any], branch: str) -> None:
    """Print out commit message for verification"""
    indented_message = "    \n".join(commit["message"].split("\n"))
    date_str = (
        parser.parse(commit["timestamp"])
        .astimezone(tz=None)
        .strftime("%a, %d %b %Y %H:%M:%S")
    )
    print(
        f"""HEAD {commit["id"]} ({branch})
Author: {commit["author"]["name"]} <{commit["author"]["email"]}>
Date:   {date_str}

    {indented_message}
"""
    )


@click.command()
@click.option(
    "--repo",
    default=github_repo,
    help="The github repository name. For example, apache/superset.",
)
@click.option(
    "--event",
    type=click.Choice(["pull_request", "push", "issue"]),
    default=["pull_request", "push"],
    show_default=True,
    multiple=True,
)
@click.option(
    "--include-last/--skip-last",
    default=False,
    show_default=True,
    help="Whether to also cancel the latest run.",
)
@click.option(
    "--include-running/--skip-running",
    default=True,
    show_default=True,
    help="Whether to also cancel running workflows.",
)
@click.argument("branch_or_pull", required=False)
def cancel_github_workflows(
    branch_or_pull: Optional[str],
    repo: str,
    event: list[str],
    include_last: bool,
    include_running: bool,
) -> None:
    """Cancel running or queued GitHub workflows by branch or pull request ID"""
    if not github_token:
        raise ClickException("Please provide GITHUB_TOKEN as an env variable")

    statuses = ("queued", "in_progress") if include_running else ("queued",)
    events = event
    pr = None

    if branch_or_pull is None:
        title = "all jobs" if include_last else "all duplicate jobs"
    elif branch_or_pull.isdigit():
        pr = get_pull_request(repo, pull_number=branch_or_pull)
        title = f"pull request #{pr['number']} - {pr['title']}"
    else:
        title = f"branch [{branch_or_pull}]"

    print(
        f"\nCancel {'active' if include_running else 'previous'} "
        f"workflow runs for {title}\n"
    )

    if pr:
        runs = get_runs(
            repo,
            statuses=statuses,
            events=event,
            branch=pr["head"]["ref"],
            user=pr["user"]["login"],
        )
    else:
        user = None
        branch = branch_or_pull
        if branch and ":" in branch:
            [user, branch] = branch.split(":", 2)
        runs = get_runs(
            repo,
            branch=branch,
            user=user,
            statuses=statuses,
            events=events,
        )

    # sort old jobs to the front, so to cancel older jobs first
    runs = sorted(runs, key=lambda x: x["created_at"])
    if runs:
        print(
            f"Found {len(runs)} potential runs of\n"
            f"   status: {statuses}\n   event: {events}\n"
        )
    else:
        print(f"No {' or '.join(statuses)} workflow runs found.\n")
        return

    if not include_last:
        # Keep the latest run for each workflow and cancel all others
        seen = set()
        dups = []
        for item in reversed(runs):
            key = f'{item["event"]}_{item["head_branch"]}_{item["workflow_id"]}'
            if key in seen:
                dups.append(item)
            else:
                seen.add(key)
        if not dups:
            print(
                "Only the latest runs are in queue. "
                "Use --include-last to force cancelling them.\n"
            )
            return
        runs = dups[::-1]

    last_sha = None

    print(f"\nCancelling {len(runs)} jobs...\n")
    for entry in runs:
        head_commit = entry["head_commit"]
        if head_commit["id"] != last_sha:
            last_sha = head_commit["id"]
            print("")
            print_commit(head_commit, entry["head_branch"])
        try:
            print(f"[{entry['status']}] {entry['name']}", end="\r")
            cancel_run(repo, entry["id"])
            print(f"[Canceled] {entry['name']}     ")
        except ClickException as error:
            print(f"[Error: {error.message}] {entry['name']}    ")
    print("")


if __name__ == "__main__":
    # pylint: disable=no-value-for-parameter
    cancel_github_workflows()