Cloud-CV/EvalAI

View on GitHub
scripts/monitoring/auto_scale_eks_nodes.py

Summary

Maintainability
C
1 day
Test Coverage
import json
import os
import time
import warnings
from datetime import datetime

import boto3
import pytz
from dateutil.parser import parse
from evalai_interface import EvalAI_Interface

warnings.filterwarnings("ignore")

utc = pytz.UTC

DEFAULT_AWS_EKS_KEYS = {  # NOTE: These are habitat challenge keys as most challenges are habitat
    "AWS_ACCOUNT_ID": os.environ.get("EKS_AWS_ACCOUNT_ID"),
    "AWS_ACCESS_KEY_ID": os.environ.get("EKS_AWS_ACCESS_KEY_ID"),
    "AWS_SECRET_ACCESS_KEY": os.environ.get("EKS_AWS_SECRET_ACCESS_KEY"),
    "AWS_REGION": os.environ.get("EKS_AWS_REGION"),
    "AWS_STORAGE_BUCKET_NAME": os.environ.get("EKS_AWS_STORAGE_BUCKET_NAME"),
}

SCALE_UP_DESIRED_SIZE = 1

# Env Variables
ENV = os.environ.get("ENV", "production")
STAFF_AUTH_TOKEN = os.environ.get("AUTH_TOKEN")
EVALAI_ENDPOINT = os.environ.get("API_HOST_URL", "https://eval.ai")

json_path = os.environ.get("JSON_PATH", "~/prod_eks_auth_tokens.json")
# QUEUES
with open(json_path, "r") as f:
    # Load the JSON data into a Python dictionary
    INCLUDED_CHALLENGE_PKS = json.load(f)


def create_evalai_interface(auth_token):
    evalai_interface = EvalAI_Interface(auth_token, EVALAI_ENDPOINT)
    return evalai_interface


def get_boto3_client(resource, aws_keys):
    client = boto3.client(
        resource,
        region_name=aws_keys["AWS_REGION"],
        aws_access_key_id=aws_keys["AWS_ACCESS_KEY_ID"],
        aws_secret_access_key=aws_keys["AWS_SECRET_ACCESS_KEY"],
    )
    return client


def get_nodegroup_name(eks_client, cluster_name):
    nodegroup_list = eks_client.list_nodegroups(clusterName=cluster_name)
    return nodegroup_list["nodegroups"][0]


def get_eks_meta(challenge, evalai_interface, aws_keys):
    # TODO: Check if eks_client should be a global thing. Clients must have an expiry/timeout.
    eks_client = get_boto3_client("eks", aws_keys)
    cluster_name = evalai_interface.get_aws_eks_cluster_details(
        challenge["id"]
    )["name"]
    nodegroup_name = get_nodegroup_name(eks_client, cluster_name)
    return eks_client, cluster_name, nodegroup_name


def get_scaling_config(eks_client, cluster_name, nodegroup_name):
    nodegroup_desc = eks_client.describe_nodegroup(
        clusterName=cluster_name, nodegroupName=nodegroup_name
    )
    scaling_config = nodegroup_desc["nodegroup"]["scalingConfig"]
    return scaling_config


def start_eks_worker(challenge, pending_submissions, evalai_interface, aws_keys, new_desired_size):
    eks_client, cluster_name, nodegroup_name = get_eks_meta(
        challenge, evalai_interface, aws_keys
    )
    scaling_config = {
        "minSize": 1,
        "maxSize": max(new_desired_size, pending_submissions),
        "desiredSize": new_desired_size,
    }
    response = eks_client.update_nodegroup_config(
        clusterName=cluster_name,
        nodegroupName=nodegroup_name,
        scalingConfig=scaling_config,
    )
    return response


def stop_eks_worker(challenge, evalai_interface, aws_keys):
    eks_client, cluster_name, nodegroup_name = get_eks_meta(
        challenge, evalai_interface, aws_keys
    )
    scaling_config = {
        "minSize": 0,
        "maxSize": 1,
        "desiredSize": 0,
    }
    response = eks_client.update_nodegroup_config(
        clusterName=cluster_name,
        nodegroupName=nodegroup_name,
        scalingConfig=scaling_config,
    )
    return response


def get_pending_submission_count(challenge_metrics):
    pending_submissions = 0
    for status in ["running", "submitted", "queued", "resuming"]:
        pending_submissions += challenge_metrics.get(status, 0)
    return pending_submissions


def scale_down_workers(challenge, desired_size, evalai_interface, aws_keys):
    if desired_size > 0:
        response = stop_eks_worker(challenge, evalai_interface, aws_keys)
        print("AWS API Response: {}".format(response))
        print(
            "Decreased nodegroup sizes for Challenge ID: {}, Title: {}.".format(
                challenge["id"], challenge["title"]
            )
        )
    else:
        print(
            "No workers and pending submissions found for Challenge ID: {}, Title: {}. Skipping.".format(
                challenge["id"], challenge["title"]
            )
        )


def scale_up_workers(challenge, original_desired_size, pending_submissions, evalai_interface, aws_keys, new_desired_size):
    if original_desired_size < new_desired_size:
        response = start_eks_worker(
            challenge, pending_submissions, evalai_interface, aws_keys, new_desired_size
        )
        print("AWS API Response: {}".format(response))
        print(
            "Increased nodegroup sizes for Challenge ID: {}, Title: {}.".format(
                challenge["id"], challenge["title"]
            )
        )
    else:
        print(
            "Existing workers and pending submissions found for Challenge ID: {}, Title: {}. Skipping.".format(
                challenge["id"], challenge["title"]
            )
        )


def scale_up_or_down_workers(challenge, evalai_interface, staff_evalai_interface, aws_keys, scale_up_desired_size):
    try:
        challenge_metrics = staff_evalai_interface.get_challenge_submission_metrics_by_pk(challenge["id"])
        pending_submissions = get_pending_submission_count(challenge_metrics)
    except Exception as e:  # noqa: F841
        print(
            "Unable to get the pending submissions for challenge ID: {}, Title: {}. Skipping.".format(
                challenge["id"], challenge["title"]
            )
        )
        print(e)
        return

    eks_client, cluster_name, nodegroup_name = get_eks_meta(
        challenge, evalai_interface, aws_keys
    )
    scaling_config = get_scaling_config(
        eks_client, cluster_name, nodegroup_name
    )
    min_size = scaling_config["minSize"]
    original_desired_size = scaling_config["desiredSize"]
    print(
        "Challenge ID : {}, Title: {}".format(
            challenge["id"], challenge["title"]
        )
    )
    print(
        "Min Size: {}, Desired Size: {}, Pending Submissions: {}".format(
            min_size, original_desired_size, pending_submissions
        )
    )

    if pending_submissions == 0 or parse(challenge["end_date"]) < pytz.UTC.localize(
        datetime.utcnow()
    ):
        scale_down_workers(challenge, original_desired_size, evalai_interface, aws_keys)
    else:
        if pending_submissions > original_desired_size:
            # Scale up again if needed, up to the maximum allowed scale_up_desired_size (if provided)
            new_desired_size = min(pending_submissions, scale_up_desired_size)
            scale_up_workers(
                challenge,
                original_desired_size,
                pending_submissions,
                evalai_interface,
                aws_keys,
                new_desired_size,
            )
        else:
            print(
                "Existing workers and pending submissions found for Challenge ID: {}, Title: {}. Skipping.".format(
                    challenge["id"], challenge["title"]
                )
            )


# Cron Job
def start_job():

    # Get metrics
    staff_evalai_interface = create_evalai_interface(STAFF_AUTH_TOKEN)

    for challenge_id, details in INCLUDED_CHALLENGE_PKS.items():
        # Auth Token
        if "auth_token" not in details:
            raise NotImplementedError("auth_token is needed for all challenges")

        # Desired Scale Up Size
        if "scale_up_desired_size" not in details:
            scale_up_desired_size = SCALE_UP_DESIRED_SIZE
        else:
            scale_up_desired_size = details["scale_up_desired_size"]

        # AWS Keys
        if "aws_keys" in details:
            aws_keys = details["aws_keys"]
        else:
            aws_keys = DEFAULT_AWS_EKS_KEYS

        try:
            evalai_interface = create_evalai_interface(details["auth_token"])
            challenge = evalai_interface.get_challenge_by_pk(challenge_id)
            assert (
                challenge["is_docker_based"] and not challenge["remote_evaluation"]
            ), "Challenge ID: {}, Title: {} is either not docker-based or remote-evaluation. Skipping.".format(
                challenge["id"], challenge["title"]
            )
            scale_up_or_down_workers(challenge, evalai_interface, staff_evalai_interface, aws_keys, scale_up_desired_size)
            time.sleep(1)
        except Exception as e:
            print(e)


if __name__ == "__main__":
    print("Starting EKS nodegroup auto scaling script")
    start_job()
    print("Quitting EKS nodegroup auto scaling script!")