Cloud-CV/EvalAI

View on GitHub
scripts/monitoring/auto_scale_ec2_workers.py

Summary

Maintainability
F
3 days
Test Coverage
import os
import pytz
import warnings
import boto3
from datetime import datetime
from dateutil.parser import parse
from evalai_interface import EvalAI_Interface

warnings.filterwarnings("ignore")

utc = pytz.UTC

ENV = os.environ.get("ENV", "dev")
evalai_endpoint = os.environ.get("API_HOST_URL", "http://localhost:8000")
auth_token = os.environ.get(
    "AUTH_TOKEN",
)


def get_boto3_client(resource, aws_keys):
    client = boto3.client(
        resource,
        region_name=aws_keys["AWS_REGION"],
        aws_access_key_id=aws_keys["AWS_ACCESS_KEY_ID"],
        aws_secret_access_key=aws_keys["AWS_SECRET_ACCESS_KEY"],
    )
    return client


def get_pending_submission_count(challenge_metrics):
    pending_submissions = 0
    for status in ["running", "submitted", "queued", "resuming"]:
        pending_submissions += challenge_metrics.get(status, 0)
    return pending_submissions


def stop_instance(challenge, evalai_interface):
    instance_details = evalai_interface.get_ec2_instance_details(challenge["id"])
    instance = instance_details["message"]
    if instance["State"]["Name"] == "running":
        response = evalai_interface.stop_challenge_ec2_instance(challenge["id"])
        print("AWS API Response: {}".format(response))
        print(
            "Stopped EC2 instance for Challenge ID: {}, Title: {}".format(
                challenge["id"], challenge["title"]
            )
        )
    else:
        print(
            "No running EC2 instance and pending messages found for Challenge ID: {}, Title: {}. Skipping.".format(
                challenge["id"], challenge["title"]
            )
        )


def start_instance(challenge, evalai_interface):
    instance_details = evalai_interface.get_ec2_instance_details(challenge["id"])
    instance = instance_details["message"]
    if instance["State"]["Name"] == "stopped":
        response = evalai_interface.start_challenge_ec2_instance(challenge["id"])
        print("AWS API Response: {}".format(response))
        print(
            "Started EC2 instance for Challenge ID: {}, Title: {}.".format(
                challenge["id"], challenge["title"]
            )
        )
    else:
        print(
            "Existing running EC2 instance and pending messages found for Challenge ID: {}, Title: {}. Skipping.".format(
                challenge["id"], challenge["title"]
            )
        )


def start_or_stop_workers(challenge, evalai_interface):
    try:
        challenge_metrics = evalai_interface.get_challenge_submission_metrics_by_pk(challenge["id"])
        pending_submissions = get_pending_submission_count(challenge_metrics)
    except Exception as e:  # noqa: F841
        print(
            "Unable to get the pending submissions for challenge ID: {}, Title: {}. Skipping.".format(
                challenge["id"], challenge["title"]
            )
        )
        print(e)
        return

    print("Pending Submissions: {}, Challenge PK: {}, Title: {}".format(pending_submissions, challenge["id"], challenge["title"]))

    if pending_submissions == 0 or parse(
        challenge["end_date"]
    ) < pytz.UTC.localize(datetime.utcnow()):
        stop_instance(challenge, evalai_interface)
    else:
        start_instance(challenge, evalai_interface)


# TODO: Factor in limits for the APIs
def start_or_stop_workers_for_challenges(response, evalai_interface):
    for challenge in response["results"]:
        if challenge["uses_ec2_worker"]:
            try:
                start_or_stop_workers(challenge, evalai_interface)
            except Exception as e:
                print(e)


def create_evalai_interface(auth_token, evalai_endpoint):
    evalai_interface = EvalAI_Interface(auth_token, evalai_endpoint)
    return evalai_interface


# Cron Job
def start_job():
    evalai_interface = create_evalai_interface(auth_token, evalai_endpoint)
    response = evalai_interface.get_challenges()
    start_or_stop_workers_for_challenges(response, evalai_interface)
    next_page = response["next"]
    while next_page is not None:
        response = evalai_interface.make_request(next_page, "GET")
        start_or_stop_workers_for_challenges(response, evalai_interface)
        next_page = response["next"]


if __name__ == "__main__":
    print("Starting EC2 workers auto scaling script")
    start_job()
    print("Quitting EC2 workers auto scaling script!")