Cloud-CV/EvalAI

View on GitHub
apps/challenges/aws_utils.py

Summary

Maintainability
F
1 wk
Test Coverage
import json
import logging
import os
import random
import string
import uuid
import yaml

from botocore.exceptions import ClientError
from django.conf import settings
from django.core import serializers
from django.core.files.temp import NamedTemporaryFile
from http import HTTPStatus

from .challenge_notification_util import (
    construct_and_send_worker_start_mail,
    construct_and_send_eks_cluster_creation_mail,
)
from .task_definitions import (
    container_definition_code_upload_worker,
    container_definition_submission_worker,
    delete_service_args,
    task_definition,
    task_definition_code_upload_worker,
    task_definition_static_code_upload_worker,
    service_definition,
    update_service_args,
)

from base.utils import get_boto3_client, send_email
from evalai.celery import app
from accounts.models import JwtToken

logger = logging.getLogger(__name__)

DJANGO_SETTINGS_MODULE = os.environ.get("DJANGO_SETTINGS_MODULE")
ENV = DJANGO_SETTINGS_MODULE.split(".")[-1]
EVALAI_DNS = os.environ.get("SERVICE_DNS")
aws_keys = {
    "AWS_ACCOUNT_ID": os.environ.get("AWS_ACCOUNT_ID", "x"),
    "AWS_ACCESS_KEY_ID": os.environ.get("AWS_ACCESS_KEY_ID", "x"),
    "AWS_SECRET_ACCESS_KEY": os.environ.get("AWS_SECRET_ACCESS_KEY", "x"),
    "AWS_REGION": os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
    "AWS_STORAGE_BUCKET_NAME": os.environ.get(
        "AWS_STORAGE_BUCKET_NAME", "evalai-s3-bucket"
    ),
}


COMMON_SETTINGS_DICT = {
    "EXECUTION_ROLE_ARN": os.environ.get(
        "EXECUTION_ROLE_ARN",
        "arn:aws:iam::{}:role/evalaiTaskExecutionRole".format(
            aws_keys["AWS_ACCOUNT_ID"]
        ),
    ),
    "WORKER_IMAGE": os.environ.get(
        "WORKER_IMAGE",
        "{}.dkr.ecr.us-east-1.amazonaws.com/evalai-{}-worker:latest".format(
            aws_keys["AWS_ACCOUNT_ID"], ENV
        ),
    ),
    "CODE_UPLOAD_WORKER_IMAGE": os.environ.get(
        "CODE_UPLOAD_WORKER_IMAGE",
        "{}.dkr.ecr.us-east-1.amazonaws.com/evalai-{}-worker:latest".format(
            aws_keys["AWS_ACCOUNT_ID"], ENV
        ),
    ),
    "CIDR": os.environ.get("CIDR"),
    "CLUSTER": os.environ.get("CLUSTER", "evalai-prod-cluster"),
    "DJANGO_SERVER": os.environ.get("DJANGO_SERVER", "localhost"),
    "EVALAI_API_SERVER": os.environ.get("EVALAI_API_SERVER", "localhost"),
    "DEBUG": settings.DEBUG,
    "EMAIL_HOST": settings.EMAIL_HOST,
    "EMAIL_HOST_PASSWORD": settings.EMAIL_HOST_PASSWORD,
    "EMAIL_HOST_USER": settings.EMAIL_HOST_USER,
    "EMAIL_PORT": settings.EMAIL_PORT,
    "EMAIL_USE_TLS": settings.EMAIL_USE_TLS,
    "MEMCACHED_LOCATION": os.environ.get("MEMCACHED_LOCATION", None),
    "RDS_DB_NAME": settings.DATABASES["default"]["NAME"],
    "RDS_HOSTNAME": settings.DATABASES["default"]["HOST"],
    "RDS_PASSWORD": settings.DATABASES["default"]["PASSWORD"],
    "RDS_USERNAME": settings.DATABASES["default"]["USER"],
    "RDS_PORT": settings.DATABASES["default"]["PORT"],
    "SECRET_KEY": settings.SECRET_KEY,
    "SENTRY_URL": os.environ.get("SENTRY_URL"),
    "STATSD_ENDPOINT": os.environ.get("STATSD_ENDPOINT"),
    "STATSD_PORT": os.environ.get("STATSD_PORT"),
}

VPC_DICT = {
    "SUBNET_1": os.environ.get("SUBNET_1", "subnet1"),
    "SUBNET_2": os.environ.get("SUBNET_2", "subnet2"),
    "SUBNET_SECURITY_GROUP": os.environ.get("SUBNET_SECURITY_GROUP", "sg"),
}


def get_code_upload_setup_meta_for_challenge(challenge_pk):
    """
    Return the EKS cluster network and arn meta for a challenge
    Arguments:
        challenge_pk {int} -- challenge pk for which credentails are to be fetched
    Returns:
        code_upload_meta {dict} -- Dict containing cluster network and arn meta
    """
    from .models import ChallengeEvaluationCluster
    from .utils import get_challenge_model

    challenge = get_challenge_model(challenge_pk)
    if challenge.use_host_credentials:
        challenge_evaluation_cluster = ChallengeEvaluationCluster.objects.get(
            challenge=challenge
        )
        code_upload_meta = {
            "SUBNET_1": challenge_evaluation_cluster.subnet_1_id,
            "SUBNET_2": challenge_evaluation_cluster.subnet_2_id,
            "SUBNET_SECURITY_GROUP": challenge_evaluation_cluster.security_group_id,
            "EKS_NODEGROUP_ROLE_ARN": challenge_evaluation_cluster.node_group_arn_role,
            "EKS_CLUSTER_ROLE_ARN": challenge_evaluation_cluster.eks_arn_role,
        }
    else:
        code_upload_meta = {
            "SUBNET_1": VPC_DICT["SUBNET_1"],
            "SUBNET_2": VPC_DICT["SUBNET_2"],
            "SUBNET_SECURITY_GROUP": VPC_DICT["SUBNET_SECURITY_GROUP"],
            "EKS_NODEGROUP_ROLE_ARN": settings.EKS_NODEGROUP_ROLE_ARN,
            "EKS_CLUSTER_ROLE_ARN": settings.EKS_CLUSTER_ROLE_ARN,
        }
    return code_upload_meta


def get_log_group_name(challenge_pk):
    log_group_name = "challenge-pk-{}-{}-workers".format(
        challenge_pk, settings.ENVIRONMENT
    )
    return log_group_name


def client_token_generator(challenge_pk):
    """
    Returns a 32 characters long client token to ensure idempotency with create_service boto3 requests.

    Parameters: None

    Returns:
    str: string of size 32 composed of digits and letters
    """
    remaining_chars = 32 - len(str(challenge_pk))
    random_char_string = "".join(
        random.choices(string.ascii_letters + string.digits, k=remaining_chars)
    )
    client_token = f"{str(challenge_pk)}{random_char_string}"

    return client_token


def register_task_def_by_challenge_pk(client, queue_name, challenge):
    """
    Registers the task definition of the worker for a challenge, before creating a service.

    Parameters:
    client (boto3.client): the client used for making requests to ECS.
    queue_name (str): queue_name is the queue field of the Challenge model used in many parameters fof the task def.
    challenge (<class 'challenges.models.Challenge'>): The challenge object for whom the task definition is being registered.

    Returns:
    dict: A dict of the task definition and it's ARN if succesful, and an error dictionary if not
    """
    container_name = "worker_{}".format(queue_name)
    code_upload_container_name = "code_upload_worker_{}".format(queue_name)
    worker_cpu_cores = challenge.worker_cpu_cores
    worker_memory = challenge.worker_memory
    ephemeral_storage = challenge.ephemeral_storage
    log_group_name = get_log_group_name(challenge.pk)
    execution_role_arn = COMMON_SETTINGS_DICT["EXECUTION_ROLE_ARN"]
    AWS_SES_REGION_NAME = settings.AWS_SES_REGION_NAME
    AWS_SES_REGION_ENDPOINT = settings.AWS_SES_REGION_ENDPOINT

    if challenge.worker_image_url:
        updated_settings = {**COMMON_SETTINGS_DICT, "WORKER_IMAGE": challenge.worker_image_url}
    else:
        updated_settings = COMMON_SETTINGS_DICT

    if execution_role_arn:
        from .utils import get_aws_credentials_for_challenge

        challenge_aws_keys = get_aws_credentials_for_challenge(challenge.pk)
        if challenge.is_docker_based:
            from .models import ChallengeEvaluationCluster

            # Cluster detail to be used by code-upload-worker
            try:
                cluster_details = ChallengeEvaluationCluster.objects.get(
                    challenge=challenge
                )
                cluster_name = cluster_details.name
                cluster_endpoint = cluster_details.cluster_endpoint
                cluster_certificate = cluster_details.cluster_ssl
                efs_id = cluster_details.efs_id
            except ClientError as e:
                logger.exception(e)
                return e.response
            # challenge host auth token to be used by code-upload-worker
            token = JwtToken.objects.get(user=challenge.creator.created_by)
            if challenge.is_static_dataset_code_upload:
                code_upload_container = (
                    container_definition_code_upload_worker.format(
                        queue_name=queue_name,
                        code_upload_container_name=code_upload_container_name,
                        auth_token=token.refresh_token,
                        cluster_name=cluster_name,
                        cluster_endpoint=cluster_endpoint,
                        certificate=cluster_certificate,
                        log_group_name=log_group_name,
                        EVALAI_DNS=EVALAI_DNS,
                        EFS_ID=efs_id,
                        **updated_settings,
                        **challenge_aws_keys,
                    )
                )
                submission_container = (
                    container_definition_submission_worker.format(
                        queue_name=queue_name,
                        container_name=container_name,
                        ENV=ENV,
                        challenge_pk=challenge.pk,
                        log_group_name=log_group_name,
                        AWS_SES_REGION_NAME=AWS_SES_REGION_NAME,
                        AWS_SES_REGION_ENDPOINT=AWS_SES_REGION_ENDPOINT,
                        **updated_settings,
                        **aws_keys,
                    )
                )
                definition = task_definition_static_code_upload_worker.format(
                    queue_name=queue_name,
                    code_upload_container=code_upload_container,
                    submission_container=submission_container,
                    CPU=worker_cpu_cores,
                    MEMORY=worker_memory,
                    ephemeral_storage=ephemeral_storage,
                    **updated_settings,
                )
            else:
                definition = task_definition_code_upload_worker.format(
                    queue_name=queue_name,
                    code_upload_container_name=code_upload_container_name,
                    ENV=ENV,
                    challenge_pk=challenge.pk,
                    auth_token=token.refresh_token,
                    cluster_name=cluster_name,
                    cluster_endpoint=cluster_endpoint,
                    certificate=cluster_certificate,
                    CPU=worker_cpu_cores,
                    MEMORY=worker_memory,
                    ephemeral_storage=ephemeral_storage,
                    log_group_name=log_group_name,
                    EVALAI_DNS=EVALAI_DNS,
                    EFS_ID=efs_id,
                    **updated_settings,
                    **challenge_aws_keys,
                )
        else:
            definition = task_definition.format(
                queue_name=queue_name,
                container_name=container_name,
                ENV=ENV,
                challenge_pk=challenge.pk,
                CPU=worker_cpu_cores,
                MEMORY=worker_memory,
                ephemeral_storage=ephemeral_storage,
                log_group_name=log_group_name,
                AWS_SES_REGION_NAME=AWS_SES_REGION_NAME,
                AWS_SES_REGION_ENDPOINT=AWS_SES_REGION_ENDPOINT,
                **updated_settings,
                **challenge_aws_keys,
            )
        definition = eval(definition)
        if not challenge.task_def_arn:
            try:
                response = client.register_task_definition(**definition)
                if (
                    response["ResponseMetadata"]["HTTPStatusCode"]
                    == HTTPStatus.OK
                ):
                    task_def_arn = response["taskDefinition"][
                        "taskDefinitionArn"
                    ]
                    challenge.task_def_arn = task_def_arn
                    challenge.save()
                return response
            except ClientError as e:
                logger.exception(e)
                return e.response
        else:
            message = "Error. Task definition already registered for challenge {}.".format(
                challenge.pk
            )
            return {
                "Error": message,
                "ResponseMetadata": {"HTTPStatusCode": HTTPStatus.BAD_REQUEST},
            }
    else:
        message = "Please ensure that the TASK_EXECUTION_ROLE_ARN is appropriately passed as an environment varible."
        return {
            "Error": message,
            "ResponseMetadata": {"HTTPStatusCode": HTTPStatus.BAD_REQUEST},
        }


def create_service_by_challenge_pk(client, challenge, client_token):
    """
    Creates the worker service for a challenge, and sets the number of workers to one.

    Parameters:
    client (boto3.client): the client used for making requests to ECS
    challenge (<class 'challenges.models.Challenge'>): The challenge object  for whom the task definition is being registered.
    client_token (str): The client token generated by client_token_generator()

    Returns:
    dict: The response returned by the create_service method from boto3. If unsuccesful, returns an error dictionary
    """

    queue_name = challenge.queue
    service_name = "{}_service".format(queue_name)
    if (
        challenge.workers is None
    ):  # Verify if the challenge is new (i.e, service not yet created.).
        if challenge.task_def_arn == "" or challenge.task_def_arn is None:
            response = register_task_def_by_challenge_pk(
                client, queue_name, challenge
            )
            if response["ResponseMetadata"]["HTTPStatusCode"] != HTTPStatus.OK:
                return response
        task_def_arn = challenge.task_def_arn
        definition = service_definition.format(
            CLUSTER=COMMON_SETTINGS_DICT["CLUSTER"],
            service_name=service_name,
            task_def_arn=task_def_arn,
            client_token=client_token,
            **VPC_DICT,
        )
        definition = eval(definition)
        try:
            response = client.create_service(**definition)
            if response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK:
                challenge.workers = 1
                challenge.save()
            return response
        except ClientError as e:
            logger.exception(e)
            return e.response
    else:
        message = "Worker service for challenge {} already exists. Please scale, stop or delete.".format(
            challenge.pk
        )
        return {
            "Error": message,
            "ResponseMetadata": {"HTTPStatusCode": HTTPStatus.BAD_REQUEST},
        }


def update_service_by_challenge_pk(
    client, challenge, num_of_tasks, force_new_deployment=False
):
    """
    Updates the worker service for a challenge, and scales the number of workers to num_of_tasks.

    Parameters:
    client (boto3.client): the client used for making requests to ECS
    challenge (<class 'challenges.models.Challenge'>): The challenge object  for whom the task definition is being registered.
    num_of_tasks (int): Number of workers to scale to for the challenge.
    force_new_deployment (bool): Set True (mainly for restarting) to specify if you want to redploy with the latest image from ECR. Default is False.

    Returns:
    dict: The response returned by the update_service method from boto3. If unsuccesful, returns an error dictionary
    """

    queue_name = challenge.queue
    service_name = "{}_service".format(queue_name)
    task_def_arn = challenge.task_def_arn

    kwargs = update_service_args.format(
        CLUSTER=COMMON_SETTINGS_DICT["CLUSTER"],
        service_name=service_name,
        task_def_arn=task_def_arn,
        force_new_deployment=force_new_deployment,
        num_of_tasks=num_of_tasks,
    )
    kwargs = eval(kwargs)

    try:
        response = client.update_service(**kwargs)
        if response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK:
            challenge.workers = num_of_tasks
            challenge.save()
        return response
    except ClientError as e:
        logger.exception(e)
        return e.response


def delete_service_by_challenge_pk(challenge):
    """
    Deletes the workers service of a challenge.

    Before deleting, it scales down the number of workers in the service to 0, then proceeds to delete the service.

    Parameters:
    challenge (<class 'challenges.models.Challenge'>): The challenge object for whom the task definition is being registered.

    Returns:
    dict: The response returned by the delete_service method from boto3
    """
    client = get_boto3_client("ecs", aws_keys)
    queue_name = challenge.queue
    service_name = "{}_service".format(queue_name)
    kwargs = delete_service_args.format(
        CLUSTER=COMMON_SETTINGS_DICT["CLUSTER"],
        service_name=service_name,
        force=True,
    )
    kwargs = eval(kwargs)
    try:
        if challenge.workers != 0:
            response = update_service_by_challenge_pk(
                client, challenge, 0, False
            )
            if response["ResponseMetadata"]["HTTPStatusCode"] != HTTPStatus.OK:
                return response

        response = client.delete_service(**kwargs)
        if response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK:
            challenge.workers = None
            challenge.save()
            client.deregister_task_definition(
                taskDefinition=challenge.task_def_arn
            )
            challenge.task_def_arn = ""
            challenge.save()
        return response
    except ClientError as e:
        logger.exception(e)
        return e.response


def service_manager(
    client, challenge, num_of_tasks=None, force_new_deployment=False
):
    """
    This method determines if the challenge is new or not, and accordingly calls <update or create>_by_challenge_pk.

    Called by: Start, Stop & Scale methods for multiple workers.

    Parameters:
    client (boto3.client): the client used for making requests to ECS.
    challenge (): The challenge object for whom the task definition is being registered.
    num_of_tasks: The number of workers to scale to (relevant only if the challenge is not new).
                  default: None

    Returns:
    dict: The response returned by the respective functions update_service_by_challenge_pk or create_service_by_challenge_pk
    """
    if challenge.workers is not None:
        response = update_service_by_challenge_pk(
            client, challenge, num_of_tasks, force_new_deployment
        )
        return response
    else:
        client_token = client_token_generator(challenge.pk)
        response = create_service_by_challenge_pk(
            client, challenge, client_token
        )
        return response


def stop_ec2_instance(challenge):
    """
    Stop the EC2 instance associated with a challenge if status checks are ready.

    Args:
        challenge (Challenge): The challenge for which the EC2 instance needs to be stopped.

    Returns:
        dict: A dictionary containing the status and message of the stop operation.
    """
    target_instance_id = challenge.ec2_instance_id

    ec2 = get_boto3_client("ec2", aws_keys)
    status_response = ec2.describe_instance_status(InstanceIds=[target_instance_id])

    if status_response["InstanceStatuses"]:
        instance_status = status_response["InstanceStatuses"][0]
        system_status = instance_status["SystemStatus"]["Status"]
        instance_status_check = instance_status["InstanceStatus"]["Status"]

        if system_status == "ok" and instance_status_check == "ok":
            instance_state = instance_status["InstanceState"]["Name"]

            if instance_state == "running":
                try:
                    response = ec2.stop_instances(InstanceIds=[target_instance_id])
                    message = "Instance for challenge {} successfully stopped.".format(challenge.pk)
                    return {
                        "response": response,
                        "message": message,
                    }
                except ClientError as e:
                    logger.exception(e)
                    return {
                        "error": e.response,
                    }
            else:
                message = "Instance for challenge {} is not running. Please ensure the instance is running.".format(challenge.pk)
                return {
                    "error": message,
                }
        else:
            message = "Instance status checks are not ready for challenge {}. Please wait for the status checks to pass.".format(challenge.pk)
            return {
                "error": message,
            }
    else:
        message = "Instance for challenge {} not found. Please ensure the instance exists.".format(challenge.pk)
        return {
            "error": message,
        }


def describe_ec2_instance(challenge):
    """
    Describe the EC2 instance associated with a challenge.

    Args:
        challenge (Challenge): The challenge for which the EC2 instance description is needed.

    Returns:
        dict: A dictionary containing the status and message of the operation.
    """
    target_instance_id = challenge.ec2_instance_id
    try:
        ec2 = get_boto3_client("ec2", aws_keys)
        response = ec2.describe_instances(InstanceIds=[target_instance_id])

        instances = [
            instance
            for reservation in response["Reservations"]
            for instance in reservation["Instances"]
        ]
        instance = instances[0]
        return {"message": instance}
    except Exception as e:
        logger.exception(e)
        return {
            "error": e.response,
        }


def start_ec2_instance(challenge):
    """
    Start the EC2 instance associated with a challenge.

    Args:
        challenge (Challenge): The challenge for which the EC2 instance needs to be started.

    Returns:
        dict: A dictionary containing the status and message of the start operation.
    """

    target_instance_id = challenge.ec2_instance_id

    ec2 = get_boto3_client("ec2", aws_keys)
    response = ec2.describe_instances(InstanceIds=[target_instance_id])

    instances = [
        instance
        for reservation in response["Reservations"]
        for instance in reservation["Instances"]
    ]

    if instances:
        instance = instances[0]
        instance_id = instance["InstanceId"]
        if instance["State"]["Name"] == "stopped":
            try:
                response = ec2.start_instances(InstanceIds=[instance_id])
                message = "Instance for challenge {} successfully started.".format(
                    challenge.pk
                )
                return {
                    "response": response,
                    "message": message,
                }
            except ClientError as e:
                logger.exception(e)
                return {
                    "error": e.response,
                }
        else:
            message = "Instance for challenge {} is running. Please ensure the instance is stopped.".format(
                challenge.pk
            )
            return {
                "error": message,
            }
    else:
        message = "Instance for challenge {} not found. Please ensure the instance exists.".format(
            challenge.pk
        )
        return {
            "error": message,
        }


def restart_ec2_instance(challenge):
    """
    Reboot the EC2 instance associated with a challenge.

    Args:
        challenge (Challenge): The challenge for which the EC2 instance needs to be restarted.

    Returns:
        dict: A dictionary containing the status and message of the reboot operation.
    """

    target_instance_id = challenge.ec2_instance_id

    ec2 = get_boto3_client("ec2", aws_keys)

    try:
        response = ec2.reboot_instances(InstanceIds=[target_instance_id])
        message = "Instance for challenge {} successfully restarted.".format(
            challenge.pk
        )
        return {
            "response": response,
            "message": message,
        }
    except ClientError as e:
        logger.exception(e)
        return {
            "error": e.response,
        }


def terminate_ec2_instance(challenge):
    """
    Terminate the EC2 instance associated with a challenge.

    Args:
        challenge (Challenge): The challenge for which the EC2 instance needs to be terminated.

    Returns:
        dict: A dictionary containing the status and message of the terminated operation.
    """

    target_instance_id = challenge.ec2_instance_id

    ec2 = get_boto3_client("ec2", aws_keys)

    try:
        response = ec2.terminate_instances(InstanceIds=[target_instance_id])
        challenge.ec2_instance_id = ""
        challenge.save()
        message = "Instance for challenge {} successfully terminated.".format(
            challenge.pk
        )
        return {
            "response": response,
            "message": message,
        }
    except ClientError as e:
        logger.exception(e)
        return {
            "error": e.response,
        }


def create_ec2_instance(challenge, ec2_storage=None, worker_instance_type=None, worker_image_url=None):
    """
    Create the EC2 instance associated with a challenge.

    Args:
        challenge (Challenge): The challenge for which the EC2 instance needs to be created.

    Returns:
        dict: A dictionary containing the status and message of the creation operation.
    """

    target_instance_id = challenge.ec2_instance_id
    if target_instance_id:
        return {
            "error": "Challenge {} has existing EC2 instance ID."
            " Please ensure there is no existing associated instance before trying to create one.".format(challenge.pk)
        }

    ec2 = get_boto3_client("ec2", aws_keys)

    with open('/code/scripts/deployment/deploy_ec2_worker.sh') as f:
        ec2_worker_script = f.read()

    if ec2_storage:
        challenge.ec2_storage = ec2_storage

    if worker_instance_type:
        challenge.worker_instance_type = worker_instance_type

    if worker_image_url:
        challenge.worker_image_url = worker_image_url
    else:
        challenge.worker_image_url = "" if challenge.worker_image_url is None else challenge.worker_image_url

    variables = {
        "AWS_ACCOUNT_ID": aws_keys["AWS_ACCOUNT_ID"],
        "AWS_ACCESS_KEY_ID": aws_keys["AWS_ACCESS_KEY_ID"],
        "AWS_SECRET_ACCESS_KEY": aws_keys["AWS_SECRET_ACCESS_KEY"],
        "AWS_REGION": aws_keys["AWS_REGION"],
        "PK": str(challenge.pk),
        "QUEUE": challenge.queue,
        "ENVIRONMENT": settings.ENVIRONMENT,
        "CUSTOM_WORKER_IMAGE": challenge.worker_image_url,
    }

    for key, value in variables.items():
        ec2_worker_script = ec2_worker_script.replace("${" + key + "}", value)

    instance_name = "Worker-Instance-{}-{}".format(settings.ENVIRONMENT, challenge.pk)
    blockDeviceMappings = [
        {
            'DeviceName': '/dev/sda1',
            'Ebs': {
                'DeleteOnTermination': True,
                'VolumeSize': challenge.ec2_storage,  # TODO: Make this customizable
                'VolumeType': 'gp2'
            }
        },
    ]

    try:
        response = ec2.run_instances(
            BlockDeviceMappings=blockDeviceMappings,
            ImageId='ami-0747bdcabd34c712a',  # TODO: Make this customizable
            InstanceType=challenge.worker_instance_type,
            MinCount=1,
            MaxCount=1,
            SubnetId=VPC_DICT["SUBNET_1"],
            KeyName="cloudcv_2016",  # TODO: Remove hardcoding
            TagSpecifications=[
                {
                    "ResourceType": "instance",
                    "Tags": [
                        {"Key": "Name", "Value": instance_name},
                    ],
                }
            ],
            UserData=ec2_worker_script,
        )
        challenge.uses_ec2_worker = True
        challenge.ec2_instance_id = response['Instances'][0]['InstanceId']
        challenge.save()
        message = "Instance for challenge {} successfully created.".format(
            challenge.pk
        )
        return {
            "response": response,
            "message": message,
        }
    except ClientError as e:
        logger.exception(e)
        return {
            "error": e.response,
        }


def update_sqs_retention_period(challenge):
    """
    Update the SQS retention period for a challenge.

    Args:
        challenge (Challenge): The challenge for which the SQS retention period is to be updated.

    Returns:
        dict: A dictionary containing the status and message of the operation.
    """
    sqs_retention_period = str(challenge.sqs_retention_period)
    try:
        sqs = get_boto3_client("sqs", aws_keys)
        queue_url = sqs.get_queue_url(QueueName=challenge.queue)['QueueUrl']
        response = sqs.set_queue_attributes(
            QueueUrl=queue_url,
            Attributes={
                'MessageRetentionPeriod': sqs_retention_period
            }
        )
        return {"message": response}
    except Exception as e:
        logger.exception(e)
        return {
            "error": str(e),
        }


def start_workers(queryset):
    """
    The function called by the admin action method to start all the selected workers.

    Calls the service_manager method. Before calling, checks if all the workers are incactive.

    Parameters:
    queryset (<class 'django.db.models.query.QuerySet'>): The queryset of selected challenges in the django admin page.

    Returns:
    dict: keys-> 'count': the number of workers successfully started.
                 'failures': a dict of all the failures with their error messages and the challenge pk
    """
    if settings.DEBUG:
        failures = []
        for challenge in queryset:
            failures.append(
                {
                    "message": "Workers cannot be started on AWS ECS service in development environment",
                    "challenge_pk": challenge.pk,
                }
            )
        return {"count": 0, "failures": failures}

    client = get_boto3_client("ecs", aws_keys)
    count = 0
    failures = []
    for challenge in queryset:
        if (challenge.workers == 0) or (challenge.workers is None):
            response = service_manager(
                client, challenge=challenge, num_of_tasks=1
            )
            if response["ResponseMetadata"]["HTTPStatusCode"] != HTTPStatus.OK:
                failures.append(
                    {
                        "message": response["Error"],
                        "challenge_pk": challenge.pk,
                    }
                )
                continue
            count += 1
        else:
            response = "Please select challenge with inactive workers only."
            failures.append(
                {"message": response, "challenge_pk": challenge.pk}
            )
    return {"count": count, "failures": failures}


def stop_workers(queryset):
    """
    The function called by the admin action method to stop all the selected workers.

    Calls the service_manager method. Before calling, verifies that the challenge is not new, and is active.

    Parameters:
    queryset (<class 'django.db.models.query.QuerySet'>): The queryset of selected challenges in the django admin page.

    Returns:
    dict: keys-> 'count': the number of workers successfully stopped.
                 'failures': a dict of all the failures with their error messages and the challenge pk
    """
    if settings.DEBUG:
        failures = []
        for challenge in queryset:
            failures.append(
                {
                    "message": "Workers cannot be stopped on AWS ECS service in development environment",
                    "challenge_pk": challenge.pk,
                }
            )
        return {"count": 0, "failures": failures}

    client = get_boto3_client("ecs", aws_keys)
    count = 0
    failures = []
    for challenge in queryset:
        if (challenge.workers is not None) and (challenge.workers > 0):
            response = service_manager(
                client, challenge=challenge, num_of_tasks=0
            )
            if response["ResponseMetadata"]["HTTPStatusCode"] != HTTPStatus.OK:
                failures.append(
                    {
                        "message": response["Error"],
                        "challenge_pk": challenge.pk,
                    }
                )
                continue
            count += 1
        else:
            response = "Please select challenges with active workers only."
            failures.append(
                {"message": response, "challenge_pk": challenge.pk}
            )
    return {"count": count, "failures": failures}


def scale_workers(queryset, num_of_tasks):
    """
    The function called by the admin action method to scale all the selected workers.

    Calls the service_manager method. Before calling, checks if the target scaling number is different than current.

    Parameters:
    queryset (<class 'django.db.models.query.QuerySet'>): The queryset of selected challenges in the django admin page.

    Returns:
    dict: keys-> 'count': the number of workers successfully started.
                 'failures': a dict of all the failures with their error messages and the challenge pk
    """
    if settings.DEBUG:
        failures = []
        for challenge in queryset:
            failures.append(
                {
                    "message": "Workers cannot be scaled on AWS ECS service in development environment",
                    "challenge_pk": challenge.pk,
                }
            )
        return {"count": 0, "failures": failures}

    client = get_boto3_client("ecs", aws_keys)
    count = 0
    failures = []
    for challenge in queryset:
        if challenge.workers is None:
            response = "Please start worker(s) before scaling."
            failures.append(
                {"message": response, "challenge_pk": challenge.pk}
            )
            continue
        if num_of_tasks == challenge.workers:
            response = "Please scale to a different number. Challenge has {} worker(s).".format(
                num_of_tasks
            )
            failures.append(
                {"message": response, "challenge_pk": challenge.pk}
            )
            continue
        response = service_manager(
            client, challenge=challenge, num_of_tasks=num_of_tasks
        )
        if response["ResponseMetadata"]["HTTPStatusCode"] != HTTPStatus.OK:
            failures.append(
                {"message": response["Error"], "challenge_pk": challenge.pk}
            )
            continue
        count += 1
    return {"count": count, "failures": failures}


def scale_resources(challenge, worker_cpu_cores, worker_memory):
    """
    The function called by scale_resources_by_challenge_pk to send the AWS ECS request to update the resources used by
    a challenge's workers.

    Deregisters the old task definition and creates a new definition that is substituted into the challenge workers.

    Parameters:
    challenge (): The challenge object for whom the task definition is being registered.
    worker_cpu_cores (int): vCPU (1 CPU core = 1024 vCPU) that should be assigned to workers.
    worker_memory (int): The amount of memory (MB) that should be assigned to each worker.

    Returns:
    dict: keys-> 'count': the number of workers successfully started.
                 'failures': a dict of all the failures with their error messages and the challenge pk
    """

    from .utils import get_aws_credentials_for_challenge

    client = get_boto3_client("ecs", aws_keys)

    if challenge.worker_cpu_cores == worker_cpu_cores and challenge.worker_memory == worker_memory:
        return {
            "Success": True,
            "Message": "Worker not modified",
            "ResponseMetadata": {"HTTPStatusCode": HTTPStatus.OK},
        }

    if not challenge.task_def_arn:
        message = "Error. No active task definition registered for the challenge {}.".format(
            challenge.pk
        )
        return {
            "Error": message,
            "ResponseMetadata": {"HTTPStatusCode": HTTPStatus.BAD_REQUEST},
        }

    try:
        response = client.deregister_task_definition(
            taskDefinition=challenge.task_def_arn
        )
        if (
                response["ResponseMetadata"]["HTTPStatusCode"]
                == HTTPStatus.OK
        ):
            challenge.task_def_arn = None
            challenge.save()
    except ClientError as e:
        e.response["Error"] = True
        e.response["Message"] = "Scaling inactive workers not supported"
        logger.exception(e)
        return e.response

    if challenge.worker_image_url:
        updated_settings = {**COMMON_SETTINGS_DICT, "WORKER_IMAGE": challenge.worker_image_url}
    else:
        updated_settings = COMMON_SETTINGS_DICT

    queue_name = challenge.queue
    container_name = "worker_{}".format(queue_name)
    log_group_name = get_log_group_name(challenge.pk)
    challenge_aws_keys = get_aws_credentials_for_challenge(challenge.pk)
    task_def = task_definition.format(
        queue_name=queue_name,
        container_name=container_name,
        ENV=ENV,
        challenge_pk=challenge.pk,
        CPU=worker_cpu_cores,
        MEMORY=worker_memory,
        ephemeral_storage=challenge.ephemeral_storage,
        log_group_name=log_group_name,
        AWS_SES_REGION_NAME=settings.AWS_SES_REGION_NAME,
        AWS_SES_REGION_ENDPOINT=settings.AWS_SES_REGION_ENDPOINT,
        **updated_settings,
        **challenge_aws_keys,
    )
    task_def = eval(task_def)

    try:
        response = client.register_task_definition(**task_def)
        if (
                response["ResponseMetadata"]["HTTPStatusCode"]
                == HTTPStatus.OK
        ):
            challenge.worker_cpu_cores = worker_cpu_cores
            challenge.worker_memory = worker_memory
            task_def_arn = response["taskDefinition"][
                "taskDefinitionArn"
            ]

            challenge.task_def_arn = task_def_arn
            challenge.save()
            force_new_deployment = False
            service_name = "{}_service".format(queue_name)
            num_of_tasks = challenge.workers
            kwargs = update_service_args.format(
                CLUSTER=COMMON_SETTINGS_DICT["CLUSTER"],
                service_name=service_name,
                task_def_arn=task_def_arn,
                num_of_tasks=num_of_tasks,
                force_new_deployment=force_new_deployment,
            )
            kwargs = eval(kwargs)
            response = client.update_service(**kwargs)
        return response
    except ClientError as e:
        logger.exception(e)
        return e.response


def delete_workers(queryset):
    """
    The function called by the admin action method to delete all the selected workers.

    Calls the delete_service_by_challenge_pk method. Before calling, verifies that the challenge is not new.

    Parameters:
    queryset (<class 'django.db.models.query.QuerySet'>): The queryset of selected challenges in the django admin page.

    Returns:
    dict: keys-> 'count': the number of workers successfully stopped.
                 'failures': a dict of all the failures with their error messages and the challenge pk
    """
    if settings.DEBUG:
        failures = []
        for challenge in queryset:
            failures.append(
                {
                    "message": "Workers cannot be deleted on AWS ECS service in development environment",
                    "challenge_pk": challenge.pk,
                }
            )
        return {"count": 0, "failures": failures}

    count = 0
    failures = []
    for challenge in queryset:
        if challenge.workers is not None:
            response = delete_service_by_challenge_pk(challenge=challenge)
            if response["ResponseMetadata"]["HTTPStatusCode"] != HTTPStatus.OK:
                failures.append(
                    {
                        "message": response["Error"],
                        "challenge_pk": challenge.pk,
                    }
                )
                continue
            count += 1
            log_group_name = get_log_group_name(challenge.pk)
            delete_log_group(log_group_name)
        else:
            response = "Please select challenges with active workers only."
            failures.append(
                {"message": response, "challenge_pk": challenge.pk}
            )
    return {"count": count, "failures": failures}


def restart_workers(queryset):
    """
    The function called by the admin action method to restart all the selected workers.

    Calls the service_manager method. Before calling, verifies that the challenge worker(s) is(are) active.

    Parameters:
    queryset (<class 'django.db.models.query.QuerySet'>): The queryset of selected challenges in the django admin page.

    Returns:
    dict: keys-> 'count': the number of workers successfully stopped.
                 'failures': a dict of all the failures with their error messages and the challenge pk
    """
    if settings.DEBUG:
        failures = []
        for challenge in queryset:
            failures.append(
                {
                    "message": "Workers cannot be restarted on AWS ECS service in development environment",
                    "challenge_pk": challenge.pk,
                }
            )
        return {"count": 0, "failures": failures}

    client = get_boto3_client("ecs", aws_keys)
    count = 0
    failures = []
    for challenge in queryset:
        if (
            challenge.is_docker_based
            and not challenge.is_static_dataset_code_upload
        ):
            response = "Sorry. This feature is not available for code upload/docker based challenges."
            failures.append(
                {"message": response, "challenge_pk": challenge.pk}
            )
        elif (challenge.workers is not None) and (challenge.workers > 0):
            response = service_manager(
                client,
                challenge=challenge,
                num_of_tasks=challenge.workers,
                force_new_deployment=True,
            )
            if response["ResponseMetadata"]["HTTPStatusCode"] != HTTPStatus.OK:
                failures.append(
                    {
                        "message": response["Error"],
                        "challenge_pk": challenge.pk,
                    }
                )
                continue
            count += 1
        else:
            response = "Please select challenges with active workers only."
            failures.append(
                {"message": response, "challenge_pk": challenge.pk}
            )
    return {"count": count, "failures": failures}


def restart_workers_signal_callback(sender, instance, field_name, **kwargs):
    """
    Called when either evaluation_script or test_annotation_script for challenge
    is updated, to restart the challenge workers.
    """
    if settings.DEBUG:
        return

    prev = getattr(instance, "_original_{}".format(field_name))
    curr = getattr(instance, "{}".format(field_name))

    if field_name == "evaluation_script":
        instance._original_evaluation_script = curr
    elif field_name == "test_annotation":
        instance._original_test_annotation = curr

    if prev != curr:
        challenge = None
        if field_name == "test_annotation":
            challenge = instance.challenge
        else:
            challenge = instance

        response = restart_workers([challenge])

        count, failures = response["count"], response["failures"]

        logger.info(
            "The worker service for challenge {} was restarted, as {} was changed.".format(
                challenge.pk, field_name
            )
        )

        if count != 1:
            logger.warning(
                "Worker(s) for challenge {} couldn't restart! Error: {}".format(
                    challenge.id, failures[0]["message"]
                )
            )
        else:
            challenge_url = "{}/web/challenges/challenge-page/{}".format(
                settings.EVALAI_API_SERVER, challenge.id
            )
            challenge_manage_url = (
                "{}/web/challenges/challenge-page/{}/manage".format(
                    settings.EVALAI_API_SERVER, challenge.id
                )
            )

            if field_name == "test_annotation":
                file_updated = "Test Annotation"
            elif field_name == "evaluation_script":
                file_updated = "Evaluation script"

            template_data = {
                "CHALLENGE_NAME": challenge.title,
                "CHALLENGE_MANAGE_URL": challenge_manage_url,
                "CHALLENGE_URL": challenge_url,
                "FILE_UPDATED": file_updated,
            }

            if challenge.image:
                template_data["CHALLENGE_IMAGE_URL"] = challenge.image.url

            template_id = settings.SENDGRID_SETTINGS.get("TEMPLATES").get(
                "WORKER_RESTART_EMAIL"
            )

            # Send email notification only when inform_hosts is true
            if challenge.inform_hosts:
                emails = challenge.creator.get_all_challenge_host_email()
                for email in emails:
                    send_email(
                        sender=settings.CLOUDCV_TEAM_EMAIL,
                        recipient=email,
                        template_id=template_id,
                        template_data=template_data,
                    )


def get_logs_from_cloudwatch(
    log_group_name, log_stream_prefix, start_time, end_time, pattern, limit
):
    """
    To fetch logs of a container from cloudwatch within a specific time frame.
    """
    client = get_boto3_client("logs", aws_keys)
    logs = []
    if settings.DEBUG:
        logs = [
            "The worker logs in the development environment are available on the terminal. Please use docker-compose logs -f worker to view the logs."
        ]
    else:
        try:
            response = client.filter_log_events(
                logGroupName=log_group_name,
                logStreamNamePrefix=log_stream_prefix,
                startTime=start_time,
                endTime=end_time,
                filterPattern=pattern,
                limit=limit
            )
            for event in response["events"]:
                logs.append(event["message"])
            nextToken = response.get("nextToken", None)
            while nextToken is not None:
                response = client.filter_log_events(
                    logGroupName=log_group_name,
                    logStreamNamePrefix=log_stream_prefix,
                    startTime=start_time,
                    endTime=end_time,
                    filterPattern=pattern,
                    limit=limit,
                    nextToken=nextToken
                )
                nextToken = response.get("nextToken", None)
                for event in response["events"]:
                    logs.append(event["message"])
        except Exception as e:
            if e.response["Error"]["Code"] == "ResourceNotFoundException":
                return logs

            logger.exception(e)
            return [
                f"There is an error in displaying logs. Please find the full error traceback here {e}"
            ]
    return logs


def delete_log_group(log_group_name):
    if settings.DEBUG:
        pass
    else:
        try:
            client = get_boto3_client("logs", aws_keys)
            client.delete_log_group(logGroupName=log_group_name)
        except Exception as e:
            logger.exception(e)


@app.task
def create_eks_nodegroup(challenge, cluster_name):
    """
    Creates a nodegroup when a EKS cluster is created by the EvalAI admin
    Arguments:
        instance {<class 'django.db.models.query.QuerySet'>} -- instance of the model calling the post hook
        cluster_name {str} -- name of eks cluster
    """
    from .utils import get_aws_credentials_for_challenge

    for obj in serializers.deserialize("json", challenge):
        challenge_obj = obj.object
    environment_suffix = "{}-{}".format(challenge_obj.pk, settings.ENVIRONMENT)
    nodegroup_name = "{}-{}-nodegroup".format(
        challenge_obj.title.replace(" ", "-")[:20], environment_suffix
    )
    challenge_aws_keys = get_aws_credentials_for_challenge(challenge_obj.pk)
    client = get_boto3_client("eks", challenge_aws_keys)
    cluster_meta = get_code_upload_setup_meta_for_challenge(challenge_obj.pk)
    # TODO: Move the hardcoded cluster configuration such as the
    # instance_type, subnets, AMI to challenge configuration later.
    try:
        response = client.create_nodegroup(
            clusterName=cluster_name,
            nodegroupName=nodegroup_name,
            scalingConfig={
                "minSize": challenge_obj.min_worker_instance,
                "maxSize": challenge_obj.max_worker_instance,
                "desiredSize": challenge_obj.desired_worker_instance,
            },
            diskSize=challenge_obj.worker_disk_size,
            subnets=[cluster_meta["SUBNET_1"], cluster_meta["SUBNET_2"]],
            instanceTypes=[challenge_obj.worker_instance_type],
            amiType=challenge_obj.worker_ami_type,
            nodeRole=cluster_meta["EKS_NODEGROUP_ROLE_ARN"],
        )
        logger.info("Nodegroup create: {}".format(response))
    except ClientError as e:
        logger.exception(e)
        return
    waiter = client.get_waiter("nodegroup_active")
    waiter.wait(clusterName=cluster_name, nodegroupName=nodegroup_name)
    construct_and_send_eks_cluster_creation_mail(challenge_obj)
    # starting the code-upload-worker
    client = get_boto3_client("ecs", aws_keys)
    client_token = client_token_generator(challenge_obj.pk)
    create_service_by_challenge_pk(client, challenge_obj, client_token)


@app.task
def setup_eks_cluster(challenge):
    """
    Creates EKS and NodeGroup ARN roles

    Arguments:
        instance {<class 'django.db.models.query.QuerySet'>} -- instance of the model calling the post hook
    """
    from .models import ChallengeEvaluationCluster
    from .serializers import ChallengeEvaluationClusterSerializer
    from .utils import get_aws_credentials_for_challenge

    for obj in serializers.deserialize("json", challenge):
        challenge_obj = obj.object
    challenge_aws_keys = get_aws_credentials_for_challenge(challenge_obj.pk)
    client = get_boto3_client("iam", challenge_aws_keys)
    environment_suffix = "{}-{}".format(challenge_obj.pk, settings.ENVIRONMENT)
    eks_role_name = "evalai-code-upload-eks-role-{}".format(environment_suffix)
    eks_arn_role = None
    try:
        response = client.create_role(
            RoleName=eks_role_name,
            Description="Amazon EKS cluster role with managed policy",
            AssumeRolePolicyDocument=json.dumps(
                settings.EKS_CLUSTER_TRUST_RELATION
            ),
        )
        eks_arn_role = response["Role"]["Arn"]
    except ClientError as e:
        logger.exception(e)
        return
    waiter = client.get_waiter("role_exists")
    waiter.wait(RoleName=eks_role_name)

    try:
        # Attach AWS managed EKS cluster policy to the role
        response = client.attach_role_policy(
            RoleName=eks_role_name,
            PolicyArn=settings.EKS_CLUSTER_POLICY,
        )
    except ClientError as e:
        logger.exception(e)
        return

    node_group_role_name = "evalai-code-upload-nodegroup-role-{}".format(
        environment_suffix
    )
    node_group_arn_role = None
    try:
        response = client.create_role(
            RoleName=node_group_role_name,
            Description="Amazon EKS node group role with managed policy",
            AssumeRolePolicyDocument=json.dumps(
                settings.EKS_NODE_GROUP_TRUST_RELATION
            ),
        )
        node_group_arn_role = response["Role"]["Arn"]
    except ClientError as e:
        logger.exception(e)
        return
    waiter = client.get_waiter("role_exists")
    waiter.wait(RoleName=node_group_role_name)

    task_execution_policies = settings.EKS_NODE_GROUP_POLICIES
    for policy_arn in task_execution_policies:
        try:
            # Attach AWS managed EKS worker node policy to the role
            response = client.attach_role_policy(
                RoleName=node_group_role_name,
                PolicyArn=policy_arn,
            )
        except ClientError as e:
            logger.exception(e)
            return

    # Create custom ECR all access policy and attach to node_group_role
    ecr_all_access_policy_name = "AWS-ECR-Full-Access-{}".format(
        environment_suffix
    )
    ecr_all_access_policy_arn = None
    try:
        response = client.create_policy(
            PolicyName=ecr_all_access_policy_name,
            PolicyDocument=json.dumps(settings.ECR_ALL_ACCESS_POLICY_DOCUMENT),
        )
        ecr_all_access_policy_arn = response["Policy"]["Arn"]
        waiter = client.get_waiter("policy_exists")
        waiter.wait(PolicyArn=ecr_all_access_policy_arn)
        # Attach custom ECR policy
        response = client.attach_role_policy(
            RoleName=node_group_role_name, PolicyArn=ecr_all_access_policy_arn
        )
    except ClientError as e:
        logger.exception(e)
        return
    try:
        challenge_evaluation_cluster = ChallengeEvaluationCluster.objects.get(
            challenge=challenge_obj
        )
        serializer = ChallengeEvaluationClusterSerializer(
            challenge_evaluation_cluster,
            data={
                "eks_arn_role": eks_arn_role,
                "node_group_arn_role": node_group_arn_role,
                "ecr_all_access_policy_arn": ecr_all_access_policy_arn,
            },
            partial=True,
        )
        if serializer.is_valid():
            serializer.save()
        # Create eks cluster vpc and subnets
        create_eks_cluster_subnets.delay(challenge)
    except Exception as e:
        logger.exception(e)
        return


@app.task
def create_eks_cluster_subnets(challenge):
    """
    Creates EKS and NodeGroup ARN roles

    Arguments:
        instance {<class 'django.db.models.query.QuerySet'>} -- instance of the model calling the post hook
    """
    from .models import ChallengeEvaluationCluster
    from .serializers import ChallengeEvaluationClusterSerializer
    from .utils import get_aws_credentials_for_challenge

    for obj in serializers.deserialize("json", challenge):
        challenge_obj = obj.object
    challenge_aws_keys = get_aws_credentials_for_challenge(challenge_obj.pk)
    environment_suffix = "{}-{}".format(challenge_obj.pk, settings.ENVIRONMENT)
    client = get_boto3_client("ec2", challenge_aws_keys)
    vpc_ids = []
    try:
        response = client.create_vpc(CidrBlock=challenge_obj.vpc_cidr)
        vpc_ids.append(response["Vpc"]["VpcId"])
    except ClientError as e:
        logger.exception(e)
        return

    waiter = client.get_waiter("vpc_available")
    waiter.wait(VpcIds=vpc_ids)

    # Create internet gateway and attach to vpc
    try:
        # Enable DNS resolution for VPC
        response = client.modify_vpc_attribute(
            EnableDnsHostnames={"Value": True}, VpcId=vpc_ids[0]
        )

        response = client.create_internet_gateway()
        internet_gateway_id = response["InternetGateway"]["InternetGatewayId"]
        client.attach_internet_gateway(
            InternetGatewayId=internet_gateway_id, VpcId=vpc_ids[0]
        )

        # Create and attach route table
        response = client.create_route_table(VpcId=vpc_ids[0])
        route_table_id = response["RouteTable"]["RouteTableId"]
        client.create_route(
            DestinationCidrBlock="0.0.0.0/0",
            GatewayId=internet_gateway_id,
            RouteTableId=route_table_id,
        )

        # Create subnets
        subnet_ids = []
        response = client.create_subnet(
            CidrBlock=challenge_obj.subnet_1_cidr,
            AvailabilityZone="us-east-1a",
            VpcId=vpc_ids[0],
        )
        subnet_1_id = response["Subnet"]["SubnetId"]
        subnet_ids.append(subnet_1_id)

        response = client.create_subnet(
            CidrBlock=challenge_obj.subnet_2_cidr,
            AvailabilityZone="us-east-1b",
            VpcId=vpc_ids[0],
        )
        subnet_2_id = response["Subnet"]["SubnetId"]
        subnet_ids.append(subnet_2_id)

        waiter = client.get_waiter("subnet_available")
        waiter.wait(SubnetIds=subnet_ids)

        # Creating managed node group needs subnets to auto assign ip v4
        for subnet_id in subnet_ids:
            response = client.modify_subnet_attribute(
                MapPublicIpOnLaunch={
                    "Value": True,
                },
                SubnetId=subnet_id,
            )

        # Associate route table with subnets
        response = client.associate_route_table(
            RouteTableId=route_table_id,
            SubnetId=subnet_1_id,
        )

        response = client.associate_route_table(
            RouteTableId=route_table_id,
            SubnetId=subnet_2_id,
        )

        # Create security group
        response = client.create_security_group(
            GroupName="EvalAI code upload challenge",
            Description="EvalAI code upload challenge worker group",
            VpcId=vpc_ids[0],
        )
        security_group_id = response["GroupId"]

        response = client.create_security_group(
            GroupName="evalai-code-upload-challenge-efs-{}".format(
                environment_suffix
            ),
            Description="EKS nodegroup EFS",
            VpcId=vpc_ids[0],
        )
        efs_security_group_id = response["GroupId"]

        response = client.authorize_security_group_ingress(
            GroupId=efs_security_group_id,
            IpPermissions=[
                {
                    "FromPort": 2049,
                    "IpProtocol": "tcp",
                    "IpRanges": [
                        {
                            "CidrIp": challenge_obj.vpc_cidr,
                        },
                    ],
                    "ToPort": 2049,
                }
            ],
        )

        # Create EFS
        efs_client = get_boto3_client("efs", challenge_aws_keys)
        efs_creation_token = str(uuid.uuid4())[:64]
        response = efs_client.create_file_system(
            CreationToken=efs_creation_token,
        )
        efs_id = response["FileSystemId"]

        challenge_evaluation_cluster = ChallengeEvaluationCluster.objects.get(
            challenge=challenge_obj
        )
        serializer = ChallengeEvaluationClusterSerializer(
            challenge_evaluation_cluster,
            data={
                "vpc_id": vpc_ids[0],
                "internet_gateway_id": internet_gateway_id,
                "route_table_id": route_table_id,
                "security_group_id": security_group_id,
                "subnet_1_id": subnet_1_id,
                "subnet_2_id": subnet_2_id,
                "efs_security_group_id": efs_security_group_id,
                "efs_id": efs_id,
                "efs_creation_token": efs_creation_token,
            },
            partial=True,
        )
        if serializer.is_valid():
            serializer.save()
        # Create eks cluster
        create_eks_cluster.delay(challenge)
    except ClientError as e:
        logger.exception(e)
        return


@app.task
def create_eks_cluster(challenge):
    """
    Called when Challenge is approved by the EvalAI admin
    calls the create_eks_nodegroup function

    Arguments:
        sender {type} -- model field called the post hook
        instance {<class 'django.db.models.query.QuerySet'>} -- instance of the model calling the post hook
    """
    from .models import ChallengeEvaluationCluster
    from .serializers import ChallengeEvaluationClusterSerializer
    from .utils import get_aws_credentials_for_challenge

    for obj in serializers.deserialize("json", challenge):
        challenge_obj = obj.object
    environment_suffix = "{}-{}".format(challenge_obj.pk, settings.ENVIRONMENT)
    cluster_name = "{}-{}-cluster".format(
        challenge_obj.title.replace(" ", "-"), environment_suffix
    )
    if challenge_obj.approved_by_admin and challenge_obj.is_docker_based:
        challenge_aws_keys = get_aws_credentials_for_challenge(
            challenge_obj.pk
        )
        client = get_boto3_client("eks", challenge_aws_keys)
        cluster_meta = get_code_upload_setup_meta_for_challenge(
            challenge_obj.pk
        )
        try:
            response = client.create_cluster(
                name=cluster_name,
                version="1.23",
                roleArn=cluster_meta["EKS_CLUSTER_ROLE_ARN"],
                resourcesVpcConfig={
                    "subnetIds": [
                        cluster_meta["SUBNET_1"],
                        cluster_meta["SUBNET_2"],
                    ],
                    "securityGroupIds": [
                        cluster_meta["SUBNET_SECURITY_GROUP"]
                    ],
                },
            )
            waiter = client.get_waiter("cluster_active")
            waiter.wait(name=cluster_name)
            # creating kubeconfig
            cluster = client.describe_cluster(name=cluster_name)
            cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
            cluster_ep = cluster["cluster"]["endpoint"]
            cluster_config = {
                "apiVersion": "v1",
                "kind": "Config",
                "clusters": [
                    {
                        "cluster": {
                            "server": str(cluster_ep),
                            "certificate-authority-data": str(cluster_cert),
                        },
                        "name": "kubernetes",
                    }
                ],
                "contexts": [
                    {
                        "context": {"cluster": "kubernetes", "user": "aws"},
                        "name": "aws",
                    }
                ],
                "current-context": "aws",
                "preferences": {},
                "users": [
                    {
                        "name": "aws",
                        "user": {
                            "exec": {
                                "apiVersion": "client.authentication.k8s.io/v1alpha1",
                                "command": "heptio-authenticator-aws",
                                "args": ["token", "-i", cluster_name],
                            }
                        },
                    }
                ],
            }

            # Write in YAML.
            config_text = yaml.dump(cluster_config, default_flow_style=False)
            config_file = NamedTemporaryFile(delete=True)
            config_file.write(config_text.encode())
            challenge_evaluation_cluster = (
                ChallengeEvaluationCluster.objects.get(challenge=challenge_obj)
            )

            efs_client = get_boto3_client("efs", challenge_aws_keys)
            # Create mount targets for subnets
            mount_target_ids = []
            response = efs_client.create_mount_target(
                FileSystemId=challenge_evaluation_cluster.efs_id,
                SubnetId=challenge_evaluation_cluster.subnet_1_id,
                SecurityGroups=[
                    challenge_evaluation_cluster.efs_security_group_id
                ],
            )
            mount_target_ids.append(response["MountTargetId"])

            response = efs_client.create_mount_target(
                FileSystemId=challenge_evaluation_cluster.efs_id,
                SubnetId=challenge_evaluation_cluster.subnet_2_id,
                SecurityGroups=[
                    challenge_evaluation_cluster.efs_security_group_id
                ],
            )
            mount_target_ids.append(response["MountTargetId"])

            serializer = ChallengeEvaluationClusterSerializer(
                challenge_evaluation_cluster,
                data={
                    "name": cluster_name,
                    "cluster_endpoint": cluster_ep,
                    "cluster_ssl": cluster_cert,
                    "efs_mount_target_ids": mount_target_ids,
                },
                partial=True,
            )
            if serializer.is_valid():
                serializer.save()
            # Creating nodegroup
            create_eks_nodegroup.delay(challenge, cluster_name)
            return response
        except ClientError as e:
            logger.exception(e)
            return


def challenge_approval_callback(sender, instance, field_name, **kwargs):
    """This is to check if a challenge has been approved or disapproved since last time.

    On approval of a challenge, it launches a worker on Fargate.
    On disapproval, it scales down the workers to 0, and deletes the challenge's service on Fargate.

    Arguments:
        sender -- The model which initated this callback (Challenge)
        instance {<class 'django.db.models.query.QuerySet'>} -- instance of the model (a challenge object)
        field_name {str} -- The name of the field to check for a change (approved_by_admin)

    """
    prev = getattr(instance, "_original_{}".format(field_name))
    curr = getattr(instance, "{}".format(field_name))
    challenge = instance
    challenge._original_approved_by_admin = curr

    if not challenge.is_docker_based and not challenge.uses_ec2_worker and challenge.remote_evaluation is False:
        if curr and not prev:
            if not challenge.workers:
                response = start_workers([challenge])
                count, failures = response["count"], response["failures"]
                if not count:
                    logger.error(
                        "Worker for challenge {} couldn't start! Error: {}".format(
                            challenge.id, failures[0]["message"]
                        )
                    )
                else:
                    construct_and_send_worker_start_mail(challenge)

        if prev and not curr:
            if challenge.workers:
                response = delete_workers([challenge])
                count, failures = response["count"], response["failures"]
                if not count:
                    logger.error(
                        "Worker for challenge {} couldn't be deleted! Error: {}".format(
                            challenge.id, failures[0]["message"]
                        )
                    )


@app.task
def setup_ec2(challenge):
    """
    Creates EC2 instance for the challenge and spawns a worker container.

    Arguments:
        challenge {<class 'django.db.models.query.QuerySet'>} -- instance of the model calling the post hook
    """
    for obj in serializers.deserialize("json", challenge):
        challenge_obj = obj.object
    if challenge_obj.ec2_instance_id:
        return start_ec2_instance(challenge_obj)
    return create_ec2_instance(challenge_obj)


@app.task
def update_sqs_retention_period_task(challenge):
    """
    Updates sqs retention period for a challenge when the attribute is changed.

    Arguments:
        challenge {<class 'django.db.models.query.QuerySet'>} -- instance of the model calling the post hook
    """
    for obj in serializers.deserialize("json", challenge):
        challenge_obj = obj.object
    return update_sqs_retention_period(challenge_obj)