scripts/cancel_github_workflows.py
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Manually cancel previous GitHub Action workflow runs in queue.
Example:
# Set up
export GITHUB_TOKEN={{ your personal github access token }}
export GITHUB_REPOSITORY=apache/superset
# cancel previous jobs for a PR, will even cancel the running ones
./cancel_github_workflows.py 1042
# cancel previous jobs for a branch
./cancel_github_workflows.py my-branch
# cancel all jobs of a PR, including the latest runs
./cancel_github_workflows.py 1024 --include-last
"""
import os
from collections.abc import Iterable, Iterator
from typing import Any, Literal, Optional, Union
import click
import requests
from click.exceptions import ClickException
from dateutil import parser
github_token = os.environ.get("GITHUB_TOKEN")
github_repo = os.environ.get("GITHUB_REPOSITORY", "apache/superset")
def request(
method: Literal["GET", "POST", "DELETE", "PUT"], endpoint: str, **kwargs: Any
) -> dict[str, Any]:
resp = requests.request(
method,
f"https://api.github.com/{endpoint.lstrip('/')}",
headers={"Authorization": f"Bearer {github_token}"},
**kwargs,
).json()
if "message" in resp:
raise ClickException(f"{endpoint} >> {resp['message']} <<")
return resp
def list_runs(
repo: str,
params: Optional[dict[str, str]] = None,
) -> Iterator[dict[str, Any]]:
"""List all github workflow runs.
Returns:
An iterator that will iterate through all pages of matching runs."""
if params is None:
params = {}
page = 1
total_count = 10000
while page * 100 < total_count:
result = request(
"GET",
f"/repos/{repo}/actions/runs",
params={**params, "per_page": 100, "page": page},
)
total_count = result["total_count"]
yield from result["workflow_runs"]
page += 1
def cancel_run(repo: str, run_id: Union[str, int]) -> dict[str, Any]:
return request("POST", f"/repos/{repo}/actions/runs/{run_id}/cancel")
def get_pull_request(repo: str, pull_number: Union[str, int]) -> dict[str, Any]:
return request("GET", f"/repos/{repo}/pulls/{pull_number}")
def get_runs(
repo: str,
branch: Optional[str] = None,
user: Optional[str] = None,
statuses: Iterable[str] = ("queued", "in_progress"),
events: Iterable[str] = ("pull_request", "push"),
) -> list[dict[str, Any]]:
"""Get workflow runs associated with the given branch"""
return [
item
for event in events
for status in statuses
for item in list_runs(repo, {"event": event, "status": status})
if (branch is None or (branch == item["head_branch"]))
and (user is None or (user == item["head_repository"]["owner"]["login"]))
]
def print_commit(commit: dict[str, Any], branch: str) -> None:
"""Print out commit message for verification"""
indented_message = " \n".join(commit["message"].split("\n"))
date_str = (
parser.parse(commit["timestamp"])
.astimezone(tz=None)
.strftime("%a, %d %b %Y %H:%M:%S")
)
print(
f"""HEAD {commit["id"]} ({branch})
Author: {commit["author"]["name"]} <{commit["author"]["email"]}>
Date: {date_str}
{indented_message}
"""
)
@click.command()
@click.option(
"--repo",
default=github_repo,
help="The github repository name. For example, apache/superset.",
)
@click.option(
"--event",
type=click.Choice(["pull_request", "push", "issue"]),
default=["pull_request", "push"],
show_default=True,
multiple=True,
)
@click.option(
"--include-last/--skip-last",
default=False,
show_default=True,
help="Whether to also cancel the latest run.",
)
@click.option(
"--include-running/--skip-running",
default=True,
show_default=True,
help="Whether to also cancel running workflows.",
)
@click.argument("branch_or_pull", required=False)
def cancel_github_workflows(
branch_or_pull: Optional[str],
repo: str,
event: list[str],
include_last: bool,
include_running: bool,
) -> None:
"""Cancel running or queued GitHub workflows by branch or pull request ID"""
if not github_token:
raise ClickException("Please provide GITHUB_TOKEN as an env variable")
statuses = ("queued", "in_progress") if include_running else ("queued",)
events = event
pr = None
if branch_or_pull is None:
title = "all jobs" if include_last else "all duplicate jobs"
elif branch_or_pull.isdigit():
pr = get_pull_request(repo, pull_number=branch_or_pull)
title = f"pull request #{pr['number']} - {pr['title']}"
else:
title = f"branch [{branch_or_pull}]"
print(
f"\nCancel {'active' if include_running else 'previous'} "
f"workflow runs for {title}\n"
)
if pr:
runs = get_runs(
repo,
statuses=statuses,
events=event,
branch=pr["head"]["ref"],
user=pr["user"]["login"],
)
else:
user = None
branch = branch_or_pull
if branch and ":" in branch:
[user, branch] = branch.split(":", 2)
runs = get_runs(
repo,
branch=branch,
user=user,
statuses=statuses,
events=events,
)
# sort old jobs to the front, so to cancel older jobs first
runs = sorted(runs, key=lambda x: x["created_at"])
if runs:
print(
f"Found {len(runs)} potential runs of\n"
f" status: {statuses}\n event: {events}\n"
)
else:
print(f"No {' or '.join(statuses)} workflow runs found.\n")
return
if not include_last:
# Keep the latest run for each workflow and cancel all others
seen = set()
dups = []
for item in reversed(runs):
key = f'{item["event"]}_{item["head_branch"]}_{item["workflow_id"]}'
if key in seen:
dups.append(item)
else:
seen.add(key)
if not dups:
print(
"Only the latest runs are in queue. "
"Use --include-last to force cancelling them.\n"
)
return
runs = dups[::-1]
last_sha = None
print(f"\nCancelling {len(runs)} jobs...\n")
for entry in runs:
head_commit = entry["head_commit"]
if head_commit["id"] != last_sha:
last_sha = head_commit["id"]
print("")
print_commit(head_commit, entry["head_branch"])
try:
print(f"[{entry['status']}] {entry['name']}", end="\r")
cancel_run(repo, entry["id"])
print(f"[Canceled] {entry['name']} ")
except ClickException as error:
print(f"[Error: {error.message}] {entry['name']} ")
print("")
if __name__ == "__main__":
# pylint: disable=no-value-for-parameter
cancel_github_workflows()