Cloud-CV/EvalAI

View on GitHub
apps/jobs/sender.py

Summary

Maintainability
C
1 day
Test Coverage
from __future__ import absolute_import

import json
import logging
import os

import boto3
import botocore
from base.utils import send_slack_notification
from challenges.models import Challenge
from django.conf import settings

from monitoring.statsd.metrics import NUM_SUBMISSIONS_IN_QUEUE, increment_statsd_counter
from settings.common import SQS_RETENTION_PERIOD

from .utils import get_submission_model

logger = logging.getLogger(__name__)


def get_or_create_sqs_queue(queue_name, challenge=None):
    """
    Args:
        queue_name: Name of the SQS Queue
    Returns:
        Returns the SQS Queue object
    """
    if settings.DEBUG or settings.TEST:
        sqs = boto3.resource(
            "sqs",
            endpoint_url=os.environ.get("AWS_SQS_ENDPOINT", "http://sqs:9324"),
            region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
            aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", "x"),
            aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "x"),
        )
        # Use default queue name in dev and test environment
        queue_name = "evalai_submission_queue"
    else:
        if challenge and challenge.use_host_sqs:
            sqs = boto3.resource(
                "sqs",
                region_name=challenge.queue_aws_region,
                aws_secret_access_key=challenge.aws_secret_access_key,
                aws_access_key_id=challenge.aws_access_key_id,
            )
        else:
            sqs = boto3.resource(
                "sqs",
                region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
                aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
                aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
            )

    if queue_name == "":
        queue_name = "evalai_submission_queue"

    # Check if the queue exists. If not, then create one.
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
    except botocore.exceptions.ClientError as ex:
        if (
            ex.response["Error"]["Code"]
            == "AWS.SimpleQueueService.NonExistentQueue"
        ):
            sqs_retention_period = SQS_RETENTION_PERIOD if challenge is None else str(challenge.sqs_retention_period)
            queue = sqs.create_queue(
                QueueName=queue_name,
                Attributes={"MessageRetentionPeriod": sqs_retention_period},
            )
        else:
            logger.exception("Cannot get or create Queue")
    return queue


def publish_submission_message(message):
    """
    Args:
        message: A Dict with following keys
            - "challenge_pk": int
            - "phase_pk": int
            - "submission_pk": int
            - "submitted_image_uri": str, (only available when the challenge is a code upload challenge)
            - "is_static_dataset_code_upload_submission": bool

    Returns:
        Returns SQS response
    """

    try:
        challenge = Challenge.objects.get(pk=message["challenge_pk"])
    except Challenge.DoesNotExist:
        logger.exception(
            "Challenge does not exist for the given id {}".format(
                message["challenge_pk"]
            )
        )
        return
    queue_name = challenge.queue
    slack_url = challenge.slack_webhook_url
    is_remote = challenge.remote_evaluation
    queue = get_or_create_sqs_queue(queue_name, challenge)
    # increase counter for submission pushed into queue
    submission_metric_tags = [
        "queue_name:%s" % queue_name,
        "is_remote:%d" % int(is_remote),
    ]
    increment_statsd_counter(NUM_SUBMISSIONS_IN_QUEUE, submission_metric_tags, 1)
    response = queue.send_message(MessageBody=json.dumps(message))
    # send slack notification
    if slack_url:
        challenge_name = challenge.title
        submission = get_submission_model(message["submission_pk"])
        participant_team_name = submission.participant_team.team_name
        phase_name = submission.challenge_phase.name
        message = {
            "text": "A *new submission* has been uploaded to {}".format(
                challenge_name
            ),
            "fields": [
                {
                    "title": "Challenge Phase",
                    "value": phase_name,
                    "short": True,
                },
                {
                    "title": "Participant Team Name",
                    "value": participant_team_name,
                    "short": True,
                },
                {
                    "title": "Submission Id",
                    "value": message["submission_pk"],
                    "short": True,
                },
            ],
        }
        send_slack_notification(slack_url, message)
    return response