scripts/workers/submission_worker.py
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import contextlib
import importlib
import json
import logging
import os
import shutil
import signal
import subprocess
import sys
import tempfile
import time
import traceback
import zipfile
from os.path import join
import boto3
import botocore
import django
import requests
import yaml
from django.core.files.base import ContentFile
from django.utils import timezone
from settings.common import SQS_RETENTION_PERIOD
from .statsd_utils import increment_and_push_metrics_to_statsd
# all challenge and submission will be stored in temp directory
BASE_TEMP_DIR = tempfile.mkdtemp(prefix='tmp')
COMPUTE_DIRECTORY_PATH = join(BASE_TEMP_DIR, "compute")
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
django.setup()
from challenges.models import (Challenge, ChallengePhase, # noqa:E402
ChallengePhaseSplit, LeaderboardData)
# Load django app settings
from django.conf import settings # noqa
from jobs.models import Submission # noqa:E402
from jobs.serializers import SubmissionSerializer # noqa:E402
LIMIT_CONCURRENT_SUBMISSION_PROCESSING = os.environ.get(
"LIMIT_CONCURRENT_SUBMISSION_PROCESSING"
)
DJANGO_SETTINGS_MODULE = os.environ.get(
"DJANGO_SETTINGS_MODULE", "settings.dev"
)
CHALLENGE_DATA_BASE_DIR = join(COMPUTE_DIRECTORY_PATH, "challenge_data")
SUBMISSION_DATA_BASE_DIR = join(COMPUTE_DIRECTORY_PATH, "submission_files")
CHALLENGE_DATA_DIR = join(CHALLENGE_DATA_BASE_DIR, "challenge_{challenge_id}")
PHASE_DATA_BASE_DIR = join(CHALLENGE_DATA_DIR, "phase_data")
PHASE_DATA_DIR = join(PHASE_DATA_BASE_DIR, "phase_{phase_id}")
PHASE_ANNOTATION_FILE_PATH = join(PHASE_DATA_DIR, "{annotation_file}")
SUBMISSION_DATA_DIR = join(
SUBMISSION_DATA_BASE_DIR, "submission_{submission_id}"
)
SUBMISSION_INPUT_FILE_PATH = join(SUBMISSION_DATA_DIR, "{input_file}")
CHALLENGE_IMPORT_STRING = "challenge_data.challenge_{challenge_id}"
EVALUATION_SCRIPTS = {}
# map of challenge id : phase id : phase annotation file name
# Use: On arrival of submission message, lookup here to fetch phase file name
# this saves db query just to fetch phase annotation file name
PHASE_ANNOTATION_FILE_NAME_MAP = {}
WORKER_LOGS_PREFIX = "WORKER_LOG"
SUBMISSION_LOGS_PREFIX = "SUBMISSION_LOG"
django.db.close_old_connections()
class GracefulKiller:
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, signum, frame):
self.kill_now = True
class ExecutionTimeLimitExceeded(Exception):
pass
class MultiOut(object):
def __init__(self, *args):
self.handles = args
def write(self, s):
for f in self.handles:
f.write(s)
def flush(self):
for f in self.handles:
f.flush()
@contextlib.contextmanager
def stdout_redirect(where):
sys.stdout = where
try:
yield where
finally:
sys.stdout = sys.__stdout__
@contextlib.contextmanager
def stderr_redirect(where):
sys.stderr = where
try:
yield where
finally:
sys.stderr = sys.__stderr__
def alarm_handler(signum, frame):
raise ExecutionTimeLimitExceeded
def download_and_extract_file(url, download_location):
"""
* Function to extract download a file.
* `download_location` should include name of file as well.
"""
try:
response = requests.get(url, stream=True)
except Exception as e:
logger.error(
"{} Failed to fetch file from {}, error {}".format(
WORKER_LOGS_PREFIX, url, e
)
)
traceback.print_exc()
response = None
if response and response.status_code == 200:
with open(download_location, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
def extract_zip_file(download_location, extract_location):
"""
Helper function to extract zip file
Params:
* `download_location`: Location of zip file
* `extract_location`: Location of directory for extracted file
"""
zip_ref = zipfile.ZipFile(download_location, "r")
zip_ref.extractall(extract_location)
zip_ref.close()
def delete_zip_file(download_location):
"""
Helper function to remove zip file from location `download_location`
Params:
* `download_location`: Location of file to be removed.
"""
try:
os.remove(download_location)
except Exception as e:
logger.error(
"{} Failed to remove zip file {}, error {}".format(
WORKER_LOGS_PREFIX, download_location, e
)
)
traceback.print_exc()
def delete_submission_data_directory(location):
"""
Helper function to delete submission data from location `location`
Arguments:
location {[string]} -- Location of directory to be removed.
"""
try:
shutil.rmtree(location)
except Exception as e:
logger.exception(
"{} Failed to delete submission data directory {}, error {}".format(
WORKER_LOGS_PREFIX, location, e
)
)
def delete_old_temp_directories(prefix='tmp'):
temp_dir = tempfile.gettempdir()
dir_creation_times = {}
for root, dirs, files in os.walk(temp_dir):
for directory in dirs:
if directory.startswith(prefix):
dir_path = os.path.join(root, directory)
try:
creation_time = os.path.getctime(dir_path)
dir_creation_times[dir_path] = creation_time
except Exception as e:
logger.info(f"Error getting creation time for directory {dir_path}: {e}")
latest_dir = max(dir_creation_times, key=dir_creation_times.get, default=None)
for dir_path in dir_creation_times:
if dir_path != latest_dir:
try:
shutil.rmtree(dir_path)
logger.info(f"Deleted directory: {dir_path}")
except Exception as e:
logger.info(f"Error deleting directory {dir_path}: {e}")
def download_and_extract_zip_file(url, download_location, extract_location):
"""
* Function to extract download a zip file, extract it and then removes the zip file.
* `download_location` should include name of file as well.
"""
try:
response = requests.get(url, stream=True)
except Exception as e:
logger.error(
"{} Failed to fetch file from {}, error {}".format(
WORKER_LOGS_PREFIX, url, e
)
)
response = None
if response and response.status_code == 200:
with open(download_location, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
# extract zip file
extract_zip_file(download_location, extract_location)
# delete zip file
delete_zip_file(download_location)
def create_dir(directory):
"""
Creates a directory if it does not exists
"""
if not os.path.exists(directory):
os.makedirs(directory)
def create_dir_as_python_package(directory):
"""
Create a directory and then makes it a python
package by creating `__init__.py` file.
"""
create_dir(directory)
init_file_path = join(directory, "__init__.py")
with open(init_file_path, "w") as init_file: # noqa
# to create empty file
pass
def return_file_url_per_environment(url):
if (
DJANGO_SETTINGS_MODULE == "settings.dev"
or DJANGO_SETTINGS_MODULE == "settings.test"
):
base_url = (
f"http://{settings.DJANGO_SERVER}:{settings.DJANGO_SERVER_PORT}"
)
url = "{0}{1}".format(base_url, url)
return url
def extract_challenge_data(challenge, phases):
"""
* Expects a challenge object and an array of phase object
* Extracts `evaluation_script` for challenge and `annotation_file` for each phase
"""
challenge_data_directory = CHALLENGE_DATA_DIR.format(
challenge_id=challenge.id
)
# create challenge directory as package
create_dir_as_python_package(challenge_data_directory)
evaluation_script_url = challenge.evaluation_script.url
evaluation_script_url = return_file_url_per_environment(
evaluation_script_url
)
# set entry in map
PHASE_ANNOTATION_FILE_NAME_MAP[challenge.id] = {}
challenge_zip_file = join(
challenge_data_directory, "challenge_{}.zip".format(challenge.id)
)
download_and_extract_zip_file(
evaluation_script_url, challenge_zip_file, challenge_data_directory
)
try:
requirements_location = join(challenge_data_directory, "requirements.txt")
if os.path.isfile(requirements_location):
subprocess.check_output([sys.executable, "-m", "pip", "install", "-r", requirements_location])
else:
logger.info("No custom requirements for challenge {}".format(challenge.id))
except Exception as e:
logger.error(e)
phase_data_base_directory = PHASE_DATA_BASE_DIR.format(
challenge_id=challenge.id
)
create_dir(phase_data_base_directory)
for phase in phases:
phase_data_directory = PHASE_DATA_DIR.format(
challenge_id=challenge.id, phase_id=phase.id
)
# create phase directory
create_dir(phase_data_directory)
annotation_file_url = phase.test_annotation.url
annotation_file_url = return_file_url_per_environment(
annotation_file_url
)
annotation_file_name = os.path.basename(phase.test_annotation.name)
PHASE_ANNOTATION_FILE_NAME_MAP[challenge.id][
phase.id
] = annotation_file_name
annotation_file_path = PHASE_ANNOTATION_FILE_PATH.format(
challenge_id=challenge.id,
phase_id=phase.id,
annotation_file=annotation_file_name,
)
download_and_extract_file(annotation_file_url, annotation_file_path)
try:
# import the challenge after everything is finished
importlib.invalidate_caches()
challenge_module = importlib.import_module(
CHALLENGE_IMPORT_STRING.format(challenge_id=challenge.id)
)
EVALUATION_SCRIPTS[challenge.id] = challenge_module
challenge.evaluation_module_error = None
challenge.save()
except Exception:
# Catch the exception and save the traceback in the Challenge object's errors attribute
traceback_msg = traceback.format_exc()
challenge.evaluation_module_error = traceback_msg
challenge.save()
logger.exception(
"{} Exception raised while creating Python module for challenge_id: {}".format(
WORKER_LOGS_PREFIX, challenge.id
)
)
raise
def load_challenge(challenge):
"""
Creates python package for a challenge and extracts relevant data
"""
# make sure that the challenge base directory exists
create_dir_as_python_package(CHALLENGE_DATA_BASE_DIR)
phases = challenge.challengephase_set.all()
extract_challenge_data(challenge, phases)
def extract_submission_data(submission_id):
"""
* Expects submission id and extracts input file for it.
"""
try:
submission = Submission.objects.get(id=submission_id)
except Submission.DoesNotExist:
logger.critical(
"{} Submission {} does not exist".format(
SUBMISSION_LOGS_PREFIX, submission_id
)
)
traceback.print_exc()
# return from here so that the message can be acked
# This also indicates that we don't want to take action
# for message corresponding to which submission entry
# does not exist
return None
# Ignore submissions with status cancelled
if submission.status == Submission.CANCELLED:
logger.info(
"{} Submission {} was cancelled by the user".format(
SUBMISSION_LOGS_PREFIX, submission_id
)
)
return None
if submission.challenge_phase.challenge.is_static_dataset_code_upload:
input_file = submission.submission_input_file
else:
input_file = submission.input_file
submission_input_file = input_file.url
submission_input_file = return_file_url_per_environment(
submission_input_file
)
submission_data_directory = SUBMISSION_DATA_DIR.format(
submission_id=submission.id
)
submission_input_file_name = os.path.basename(input_file.name)
submission_input_file_path = SUBMISSION_INPUT_FILE_PATH.format(
submission_id=submission.id, input_file=submission_input_file_name
)
# create submission directory
create_dir_as_python_package(submission_data_directory)
download_and_extract_file(
submission_input_file, submission_input_file_path
)
return submission
def run_submission(
challenge_id, challenge_phase, submission, user_annotation_file_path
):
"""
* receives a challenge id, phase id and user annotation file path
* checks whether the corresponding evaluation script for the challenge exists or not
* checks the above for annotation file
* calls evaluation script via subprocess passing annotation file and user_annotation_file_path as argument
"""
# Use the submission serializer to send relevant data to evaluation script
# so that challenge hosts can use data for webhooks or any other service.
submission_serializer = SubmissionSerializer(submission)
submission_output = None
phase_id = challenge_phase.id
annotation_file_name = PHASE_ANNOTATION_FILE_NAME_MAP.get(
challenge_id
).get(phase_id)
annotation_file_path = PHASE_ANNOTATION_FILE_PATH.format(
challenge_id=challenge_id,
phase_id=phase_id,
annotation_file=annotation_file_name,
)
submission_data_dir = SUBMISSION_DATA_DIR.format(
submission_id=submission.id
)
submission.status = Submission.RUNNING
submission.started_at = timezone.now()
submission.save()
# create a temporary run directory under submission directory, so that
# main directory does not gets polluted
temp_run_dir = join(submission_data_dir, "run")
create_dir(temp_run_dir)
stdout_file = join(temp_run_dir, "temp_stdout.txt")
stderr_file = join(temp_run_dir, "temp_stderr.txt")
stdout = open(stdout_file, "a+")
stderr = open(stderr_file, "a+")
remote_evaluation = submission.challenge_phase.challenge.remote_evaluation
if remote_evaluation:
try:
logger.info(
"{} Sending submission {} for remote evaluation".format(
SUBMISSION_LOGS_PREFIX, submission.id
)
)
with stdout_redirect(
MultiOut(stdout, sys.__stdout__)
) as new_stdout, stderr_redirect( # noqa
MultiOut(stderr, sys.__stderr__)
) as new_stderr: # noqa
submission_output = EVALUATION_SCRIPTS[challenge_id].evaluate(
annotation_file_path,
user_annotation_file_path,
challenge_phase.codename,
submission_metadata=submission_serializer.data,
)
return
except Exception:
stderr.write(traceback.format_exc())
stderr.close()
stdout.close()
submission.status = Submission.FAILED
submission.completed_at = timezone.now()
submission.save()
if not challenge_phase.disable_logs:
with open(stdout_file, "r") as stdout:
stdout_content = stdout.read()
submission.stdout_file.save(
"stdout.txt", ContentFile(stdout_content)
)
with open(stderr_file, "r") as stderr:
stderr_content = stderr.read()
submission.stderr_file.save(
"stderr.txt", ContentFile(stderr_content)
)
# delete the complete temp run directory
shutil.rmtree(temp_run_dir)
return
# call `main` from globals and set `status` to running and hence `started_at`
try:
successful_submission_flag = True
with stdout_redirect(
MultiOut(stdout, sys.__stdout__)
) as new_stdout, stderr_redirect( # noqa
MultiOut(stderr, sys.__stderr__)
) as new_stderr: # noqa
submission_output = EVALUATION_SCRIPTS[challenge_id].evaluate(
annotation_file_path,
user_annotation_file_path,
challenge_phase.codename,
submission_metadata=submission_serializer.data,
)
"""
A submission will be marked successful only if it is of the format
{
"result":[
{
"split_codename_1":{
"key1":30,
"key2":50,
}
},
{
"split_codename_2":{
"key1":90,
"key2":10,
}
},
{
"split_codename_3":{
"key1":100,
"key2":45,
}
}
],
"submission_metadata": {'foo': 'bar'},
"submission_result": ['foo', 'bar'],
}
"""
error_bars_dict = dict()
if "error" in submission_output:
for split_error in submission_output["error"]:
split_code_name = list(split_error.keys())[0]
error_bars_dict[split_code_name] = split_error[split_code_name]
if "result" in submission_output:
leaderboard_data_list = []
for split_result in submission_output["result"]:
# get split_code_name that is the key of the result
split_code_name = list(split_result.keys())[0]
# Check if the challenge_phase_split exists for the challenge_phaseand dataset_split
try:
challenge_phase_split = ChallengePhaseSplit.objects.get(
challenge_phase=challenge_phase,
dataset_split__codename=split_code_name,
)
except Exception:
stderr.write(
"ORGINIAL EXCEPTION: No such relation between Challenge Phase and DatasetSplit"
" specified by Challenge Host \n"
)
stderr.write(traceback.format_exc())
successful_submission_flag = False
break
# Check if the dataset_split exists for the codename in the result
try:
dataset_split = challenge_phase_split.dataset_split
except Exception:
stderr.write(
"ORGINIAL EXCEPTION: The codename specified by your Challenge Host doesn't match"
" with that in the evaluation Script.\n"
)
stderr.write(traceback.format_exc())
successful_submission_flag = False
break
leaderboard_data = LeaderboardData()
leaderboard_data.challenge_phase_split = challenge_phase_split
leaderboard_data.submission = submission
leaderboard_data.leaderboard = (
challenge_phase_split.leaderboard
)
leaderboard_data.result = split_result.get(
dataset_split.codename
)
leaderboard_data.is_disabled = False
if "error" in submission_output:
leaderboard_data.error = error_bars_dict.get(
dataset_split.codename
)
leaderboard_data_list.append(leaderboard_data)
if successful_submission_flag:
LeaderboardData.objects.bulk_create(leaderboard_data_list)
# Once the submission_output is processed, then save the submission object with appropriate status
else:
successful_submission_flag = False
except Exception:
stderr.write(traceback.format_exc())
successful_submission_flag = False
# Set submission_output to None to handle case when evaluation script throws exception
# In case of exception from evaluation script submission_output is assigned exception object
submission_output = None
submission_status = (
Submission.FINISHED
if successful_submission_flag
else Submission.FAILED
)
submission.status = submission_status
submission.completed_at = timezone.now()
submission.save()
# after the execution is finished, set `status` to finished and hence `completed_at`
if submission_output and successful_submission_flag:
output = {}
output["result"] = submission_output.get("result", "")
submission.output = output
# Save submission_result_file
submission_result = submission_output.get("submission_result", "")
submission_result = json.dumps(submission_result)
submission.submission_result_file.save(
"submission_result.json", ContentFile(submission_result)
)
# Save submission_metadata_file
submission_metadata = submission_output.get("submission_metadata", "")
submission.submission_metadata_file.save(
"submission_metadata.json", ContentFile(submission_metadata)
)
submission.save()
stderr.close()
stdout.close()
stderr_content = open(stderr_file, "r").read()
stdout_content = open(stdout_file, "r").read()
# TODO :: see if two updates can be combine into a single update.
if not challenge_phase.disable_logs:
with open(stdout_file, "r") as stdout:
stdout_content = stdout.read()
submission.stdout_file.save("stdout.txt", ContentFile(stdout_content))
if submission_status is Submission.FAILED:
with open(stderr_file, "r") as stderr:
stderr_content = stderr.read().encode("utf-8")
submission.stderr_file.save(
"stderr.txt", ContentFile(stderr_content)
)
# delete the complete temp run directory
shutil.rmtree(temp_run_dir)
def process_submission_message(message):
"""
Extracts the submission related metadata from the message
and send the submission object for evaluation
"""
challenge_id = message.get("challenge_pk")
phase_id = message.get("phase_pk")
submission_id = message.get("submission_pk")
submission_instance = extract_submission_data(submission_id)
# so that the further execution does not happen
if not submission_instance:
return
try:
challenge_phase = ChallengePhase.objects.get(id=phase_id)
except ChallengePhase.DoesNotExist:
logger.exception(
"{} Challenge Phase {} does not exist".format(
WORKER_LOGS_PREFIX, phase_id
)
)
raise
if (
submission_instance.challenge_phase.challenge.is_static_dataset_code_upload
):
input_file_name = submission_instance.submission_input_file.name
else:
input_file_name = submission_instance.input_file.name
user_annotation_file_path = join(
SUBMISSION_DATA_DIR.format(submission_id=submission_id),
os.path.basename(input_file_name),
)
run_submission(
challenge_id,
challenge_phase,
submission_instance,
user_annotation_file_path,
)
# Delete submission data after processing submission
delete_submission_data_directory(
SUBMISSION_DATA_DIR.format(submission_id=submission_id)
)
def process_add_challenge_message(message):
challenge_id = message.get("challenge_id")
try:
challenge = Challenge.objects.get(id=challenge_id)
except Challenge.DoesNotExist:
logger.exception(
"{} Challenge {} does not exist".format(
WORKER_LOGS_PREFIX, challenge_id
)
)
phases = challenge.challengephase_set.all()
extract_challenge_data(challenge, phases)
def process_submission_callback(body):
try:
logger.info(
"{} [x] Received submission message {}".format(
SUBMISSION_LOGS_PREFIX, body
)
)
body = yaml.safe_load(body)
body = dict((k, int(v)) for k, v in body.items())
process_submission_message(body)
except Exception as e:
logger.exception(
"{} Exception while receiving message from submission queue with error {}".format(
SUBMISSION_LOGS_PREFIX, e
)
)
def get_or_create_sqs_queue(queue_name, challenge=None):
"""
Returns:
Returns the SQS Queue object
"""
if settings.DEBUG or settings.TEST:
sqs = boto3.resource(
"sqs",
endpoint_url=os.environ.get("AWS_SQS_ENDPOINT", "http://sqs:9324"),
region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
)
else:
if challenge and challenge.use_host_sqs:
sqs = boto3.resource(
"sqs",
region_name=challenge.queue_aws_region,
aws_secret_access_key=challenge.aws_secret_access_key,
aws_access_key_id=challenge.aws_access_key_id,
)
else:
sqs = boto3.resource(
"sqs",
region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
)
if queue_name == "":
queue_name = "evalai_submission_queue"
# Check if the queue exists. If no, then create one
try:
queue = sqs.get_queue_by_name(QueueName=queue_name)
except botocore.exceptions.ClientError as ex:
if (
ex.response["Error"]["Code"]
!= "AWS.SimpleQueueService.NonExistentQueue"
):
logger.exception("Cannot get queue: {}".format(queue_name))
sqs_retention_period = SQS_RETENTION_PERIOD if challenge is None else str(challenge.sqs_retention_period)
queue = sqs.create_queue(
QueueName=queue_name,
Attributes={"MessageRetentionPeriod": sqs_retention_period},
)
return queue
def load_challenge_and_return_max_submissions(q_params):
try:
challenge = Challenge.objects.get(**q_params)
except Challenge.DoesNotExist:
logger.exception(
"{} Challenge with pk {} doesn't exist".format(
WORKER_LOGS_PREFIX, q_params["pk"]
)
)
raise
load_challenge(challenge)
maximum_concurrent_submissions = (
challenge.max_concurrent_submission_evaluation
)
return maximum_concurrent_submissions, challenge
def main():
killer = GracefulKiller()
logger.info(
"{} Using {} as temp directory to store data".format(
WORKER_LOGS_PREFIX, BASE_TEMP_DIR
)
)
delete_old_temp_directories()
create_dir_as_python_package(COMPUTE_DIRECTORY_PATH)
sys.path.append(COMPUTE_DIRECTORY_PATH)
q_params = {}
q_params["end_date__gt"] = timezone.now()
challenge_pk = os.environ.get("CHALLENGE_PK")
if challenge_pk:
q_params["pk"] = challenge_pk
if settings.DEBUG or settings.TEST:
if eval(LIMIT_CONCURRENT_SUBMISSION_PROCESSING):
if not challenge_pk:
logger.exception(
"{} Please add CHALLENGE_PK for the challenge to be loaded in the docker.env file.".format(
WORKER_LOGS_PREFIX
)
)
sys.exit(1)
(
maximum_concurrent_submissions,
challenge,
) = load_challenge_and_return_max_submissions(q_params)
else:
challenges = Challenge.objects.filter(**q_params)
for challenge in challenges:
load_challenge(challenge)
else:
(
maximum_concurrent_submissions,
challenge,
) = load_challenge_and_return_max_submissions(q_params)
# create submission base data directory
create_dir_as_python_package(SUBMISSION_DATA_BASE_DIR)
queue_name = os.environ.get("CHALLENGE_QUEUE", "evalai_submission_queue")
queue = get_or_create_sqs_queue(queue_name, challenge)
is_remote = int(challenge.remote_evaluation)
while True:
for message in queue.receive_messages():
if json.loads(message.body).get(
"is_static_dataset_code_upload_submission"
):
continue
if settings.DEBUG or settings.TEST:
if eval(LIMIT_CONCURRENT_SUBMISSION_PROCESSING):
current_running_submissions_count = (
Submission.objects.filter(
challenge_phase__challenge=challenge.id,
status="running",
).count()
)
if (
current_running_submissions_count
== maximum_concurrent_submissions
):
pass
else:
logger.info(
"{} Processing message body: {}".format(
WORKER_LOGS_PREFIX, message.body
)
)
process_submission_callback(message.body)
# Let the queue know that the message is processed
message.delete()
increment_and_push_metrics_to_statsd(queue_name, is_remote)
else:
logger.info(
"{} Processing message body: {}".format(
WORKER_LOGS_PREFIX, message.body
)
)
process_submission_callback(message.body)
# Let the queue know that the message is processed
message.delete()
increment_and_push_metrics_to_statsd(queue_name, is_remote)
else:
current_running_submissions_count = Submission.objects.filter(
challenge_phase__challenge=challenge.id, status="running"
).count()
if (
current_running_submissions_count
== maximum_concurrent_submissions
):
pass
else:
logger.info(
"{} Processing message body: {}".format(
WORKER_LOGS_PREFIX, message.body
)
)
process_submission_callback(message.body)
# Let the queue know that the message is processed
message.delete()
increment_and_push_metrics_to_statsd(queue_name, is_remote)
if killer.kill_now:
break
time.sleep(0.1)
if __name__ == "__main__":
main()
logger.info("{} Quitting Submission Worker.".format(WORKER_LOGS_PREFIX))