Cloud-CV/EvalAI

View on GitHub
apps/challenges/challenge_config_utils.py

Summary

Maintainability
F
2 wks
Test Coverage
import logging
import requests
import zipfile
import yaml

from django.core.files.base import ContentFile

import re
from os.path import basename, isfile, join
from challenges.models import ChallengePhase, ChallengePhaseSplit, DatasetSplit, Leaderboard, Challenge
from rest_framework import status

from yaml.scanner import ScannerError

from .serializers import (
    ChallengePhaseCreateSerializer,
    DatasetSplitSerializer,
    LeaderboardSerializer,
    ZipChallengePhaseSplitSerializer,
    ZipChallengeSerializer,
)
from .utils import (
    get_file_content,
    get_missing_keys_from_dict,
    read_file_data_as_content_file,
)

logger = logging.getLogger(__name__)


def write_file(output_path, mode, file_content):
    with open(output_path, mode) as file:
        file.write(file_content)


def extract_zip_file(file_path, mode, output_path):
    zip_ref = zipfile.ZipFile(file_path, mode)
    zip_ref.extractall(output_path)
    logger.info("Zip file extracted to {}".format(output_path))
    zip_ref.close()
    return zip_ref


def get_yaml_files_from_challenge_config(zip_ref):
    """
    Arguments:
        zip_ref {zipfile} -- reference to challenge config zip
    Returns:
        yaml_file_count {int} -- number of yaml files in zip file
        yaml_file_name {string} -- name of yaml file in the zip file
        extracted_folder_name {string} -- zip file extraction folder name
    """
    yaml_file_count = 0
    yaml_file_name = None
    extracted_folder_name = None
    for name in zip_ref.namelist():
        if (
            (name == "challenge_config.yaml" or name == "challenge_config.yml")
            and not name.startswith("__MACOSX")
        ):
            yaml_file_name = name
            extracted_folder_name = yaml_file_name.split(
                basename(yaml_file_name)
            )[0]
            yaml_file_count += 1

    if not yaml_file_count:
        return yaml_file_count, None, None
    return yaml_file_count, yaml_file_name, extracted_folder_name


def read_yaml_file(file_path, mode):
    with open(file_path, mode) as stream:
        yaml_file_data = yaml.safe_load(stream)
    return yaml_file_data


def get_yaml_read_error(exc):
    """
    Arguments:
        exc {Exception} -- Exception object
    Returns:
        error_description {string} -- description of yaml read error
        line_number {int} -- line number of error field in yaml file
        column_number {int} -- column number of error field in yaml file
    """
    error_description = None
    line_number = None
    column_number = None
    # To get the problem description
    if hasattr(exc, "problem"):
        error_description = exc.problem
        # To capitalize the first alphabet of the problem description as default is in lowercase
        error_description = error_description[0:].capitalize()
    # To get the error line and column number
    if hasattr(exc, "problem_mark"):
        mark = exc.problem_mark
        line_number = mark.line + 1
        column_number = mark.column + 1
    return error_description, line_number, column_number


def is_challenge_config_yaml_html_field_valid(
    yaml_file_data, key, base_location
):
    """
    Arguments:
        yaml_file_data {dict} -- challenge config yaml dict
        key {string} -- key of the validation field
        base_location {string} -- path of extracted config zip
    Returns:
        is_valid {boolean} -- flag for field validation is success
        message {string} -- error message if any
    """
    value = join(base_location, yaml_file_data.get(key))
    message = ""
    is_valid = False
    if value:
        if not isfile(value):
            message = "File at path {} not found. Please specify a valid file path".format(key)
        elif not value.endswith(
            ".html"
        ):
            message = "File {} is not a HTML file. Please specify a valid HTML file".format(key)
        else:
            is_valid = True
    else:
        message = "ERROR: There is no key for {} in YAML file".format(key)
    return is_valid, message


def is_challenge_phase_config_yaml_html_field_valid(
    yaml_file_data, key, base_location
):
    """
    Arguments:
        yaml_file_data {dict} -- challenge config yaml dict
        key {string} -- key of the validation field
        base_location {string} -- path of extracted config zip
    Returns:
        is_valid {boolean} -- flag for field validation is success
        message {string} -- error message if any
    """
    value = yaml_file_data.get(key)
    message = ""
    is_valid = False
    if value:
        is_valid = True
    else:
        message = " ERROR: There is no key for {} in phase {}.".format(
            key, yaml_file_data["name"]
        )
    return is_valid, message


def download_and_write_file(url, stream, output_path, mode):
    """
    Arguments:
        url {string} -- source file url
        stream {boolean} -- flag for download in stream mode
        output_path {string} -- path to write file
        model {string} -- access mode to write file
    Returns:
        is_success {boolean} -- flag for download and write is success
        message {string} -- error message if any
    """
    is_success = False
    message = None
    try:
        response = requests.get(url, stream=stream)
        try:
            if response and response.status_code == status.HTTP_200_OK:
                write_file(output_path, mode, response.content)
                is_success = True
        except IOError:
            message = (
                "Unable to process the uploaded zip file. " "Please try again!"
            )
    except requests.exceptions.RequestException:
        message = (
            "A server error occured while processing zip file. "
            "Please try again!"
        )
    return is_success, message


def is_challenge_phase_split_mapping_valid(
    phase_ids, leaderboard_ids, dataset_split_ids, phase_split, challenge_phase_split_index
):
    """
    Arguments:
        phase_ids {array} -- list of phase ids
        leaderboard_ids {array} -- list of leaderboard ids
        dataset_split_ids {array} -- list of dataset split ids
        phase_split {dict} -- challenge phase split config
    Returns:
        is_success {boolean} -- flag for validation success
    """
    phase_id = phase_split["challenge_phase_id"]
    leaderboard_id = phase_split["leaderboard_id"]
    dataset_split_id = phase_split["dataset_split_id"]
    error_messages = []

    if leaderboard_id not in leaderboard_ids:
        error_messages.append("ERROR: Invalid leaderboard id {} found in challenge phase split {}.".format(leaderboard_id, challenge_phase_split_index))
    if phase_id not in phase_ids:
        error_messages.append("ERROR: Invalid phased id {} found in challenge phase split {}.".format(phase_id, challenge_phase_split_index))
    if dataset_split_id not in dataset_split_ids:
        error_messages.append("ERROR: Invalid dataset split id {} found in challenge phase split {}.".format(dataset_split_id, challenge_phase_split_index))

    if error_messages:
        return False, error_messages
    else:
        return True, error_messages


def get_value_from_field(data, base_location, field_name):
    file_path = join(base_location, data.get(field_name))
    field_value = None
    if file_path.endswith(".html") and isfile(file_path):
        field_value = get_file_content(file_path, "rb").decode("utf-8")
    return field_value


error_message_dict = {
    "no_yaml_file": "There is no YAML file in the zip file you uploaded!",
    "multiple_yaml_files": "There are {} challenge config YAML files instead of 1 in the zip file!",
    "yaml_file_read_error": "\n{} in line {}, column {}\n",
    "missing_challenge_title": "Please add the challenge title",
    "missing_challenge_description": "Please add the challenge description",
    "missing_evaluation_details": "Please add the evaluation details",
    "missing_terms_and_conditions": "Please add the terms and conditions.",
    "missing_submission_guidelines": "Please add the submission guidelines.",
    "missing_evaluation_script": "ERROR: No evaluation script is present in the zip file. Please add it and then try again!",
    "missing_evaluation_script_key": "ERROR: There is no key for the evaluation script in the YAML file. Please add it and then try again!",
    "missing_leaderboard_id": "ERROR: There is no leaderboard ID for the leaderboard.",
    "missing_leaderboard_schema": "ERROR: There is no leaderboard schema for the leaderboard with ID: {}.",
    "missing_leaderboard_default_order_by": "ERROR: There is no 'default_order_by' key in the schema for the leaderboard with ID: {}.",
    "missing_leaderboard_key": "ERROR: There is no key for the leaderboard in the YAML file. Please add it and then try again!",
    "incorrect_default_order_by": "ERROR: The 'default_order_by' value '{}' in the schema for the leaderboard with ID: {} is not a valid label.",
    "leaderboard_schema_error": "ERROR: The leaderboard with ID: {} has the following schema errors:\n {}",
    "leaderboard_additon_after_creation": "ERROR: The leaderboard with ID: {} doesn't exist. Addition of a new leaderboard after challenge creation is not allowed.",
    "leaderboard_deletion_after_creation": "ERROR: The leaderboard with ID: {} not found in config. Deletion of an existing leaderboard after challenge creation is not allowed.",
    "missing_leaderboard_labels": "ERROR: There is no 'labels' key in the schema for the leaderboard with ID: {}.",
    "missing_challenge_phases": "ERROR: No challenge phase key found. Please add challenge phases in the YAML file and try again!",
    "missing_challenge_phase_codename": "ERROR: No codename found for the challenge phase. Please add the codename and try again!",
    "missing_test_annotation_file": "ERROR: No test annotation file found in the zip file for challenge phase {}.",
    "missing_submission_meta_attribute_keys": "ERROR: Please enter the following keys to the submission meta attribute in challenge phase {}: {}",
    "invalid_submission_meta_attribute_types": "ERROR: Please ensure that the submission meta attribute types for the attribute '{}' in challenge phase {} are among the following: boolean, text, radio, or checkbox.",
    "missing_challenge_phase_id": "ERROR: Challenge phase {} doesn't exist. Addition of a new challenge phase after challenge creation is not allowed.",
    "missing_challenge_phase_id_config": "ERROR: Challenge phase {} doesn't exist. Addition of a new challenge phase after challenge creation is not allowed.",
    "missing_leaderboard_id_config": "ERROR: The leaderboard with ID: {} doesn't exist. Addition of a new leaderboard after challenge creation is not allowed.",
    "missing_existing_leaderboard_id": "ERROR: The leaderboard with ID: {} was not found in the configuration. Deletion of an existing leaderboard after challenge creation is not allowed.",
    "missing_existing_challenge_phase_id": "ERROR: Challenge phase {} was not found in the configuration. Deletion of an existing challenge phase after challenge creation is not allowed.",
    "missing_dataset_splits_key": "ERROR: There is no key for dataset splits.",
    "missing_dataset_split_name": "ERROR: There is no name for dataset split {}.",
    "missing_dataset_split_codename": "ERROR: There is no codename for dataset split {}.",
    "duplicate_dataset_split_codename": "ERROR: Duplicate codename {} for dataset split {}. Please ensure codenames are unique.",
    "dataset_split_schema_errors": "ERROR: Dataset split {} has the following schema errors:\n {}",
    "dataset_split_addition": "ERROR: Dataset split {} doesn't exist. Addition of a new dataset split after challenge creation is not allowed.",
    "missing_existing_dataset_split_id": "ERROR: Dataset split {} not found in config. Deletion of existing dataset split after challenge creation is not allowed.",
    "challenge_phase_split_not_exist": "ERROR: Challenge phase split (leaderboard_id: {}, challenge_phase_id: {}, dataset_split_id: {}) doesn't exist. Addition of challenge phase split after challenge creation is not allowed.",
    "challenge_phase_split_schema_errors": "ERROR: Challenge phase split {} has the following schema errors:\n {}",
    "missing_keys_in_challenge_phase_splits": "ERROR: The following keys are missing in the challenge phase splits of YAML file (phase_split: {}): {}",
    "challenge_phase_split_not_found": "ERROR: Challenge phase split (leaderboard_id: {}, challenge_phase_id: {}, dataset_split_id: {}) not found in config. Deletion of existing challenge phase split after challenge creation is not allowed.",
    "no_key_for_challenge_phase_splits": "ERROR: There is no key for challenge phase splits.",
    "no_codename_for_challenge_phase": "ERROR: No codename found for the challenge phase. Please add a codename and try again!",
    "duplicate_codename_for_phase": "ERROR: Duplicate codename {} for phase {}. Please ensure codenames are unique",
    "no_test_annotation_file_found": "ERROR: No test annotation file found in the zip file for challenge phase {}",
    "submission_meta_attribute_option_missing": "ERROR: Please include at least one option in the attribute for challenge phase {}",
    "missing_submission_meta_attribute_fields": "ERROR: Please enter the following fields for the submission meta attribute in challenge phase {}: {}",
    "challenge_phase_schema_errors": "ERROR: Challenge phase {} has the following schema errors:\n {}",
    "challenge_phase_addition": "ERROR: Challenge phase {} doesn't exist. Addition of a new challenge phase after challenge creation is not allowed.",
    "challenge_phase_not_found": "ERROR: Challenge phase {} not found in config. Deletion of existing challenge phase after challenge creation is not allowed.",
    "is_submission_public_restricted": "ERROR: is_submission_public can't be 'True' for challenge phase '{}' with is_restricted_to_select_one_submission 'True'. Please change is_submission_public to 'False' and try again!",
    "missing_option_in_submission_meta_attribute": "ERROR: Please include at least one option in the attribute for challenge phase {}",
    "missing_fields_in_submission_meta_attribute": "ERROR: Please enter the following fields for the submission meta attribute in challenge phase {}: {}",
    "missing_date": "ERROR: Please add the start_date and end_date.",
    "start_date_greater_than_end_date": "ERROR: Start date cannot be greater than end date.",
    "missing_dates_challenge_phase": "ERROR: Please add the start_date and end_date in challenge phase {}.",
    "start_date_greater_than_end_date_challenge_phase": "ERROR: Start date cannot be greater than end date in challenge phase {}.",
    "extra_tags": "ERROR: Tags are limited to 4. Please remove extra tags then try again!",
    "wrong_domain": "ERROR: Domain name is incorrect. Please enter correct domain name then try again!",
    "duplicate_combinations_in_challenge_phase_splits": "ERROR: Duplicate combinations of leaderboard_id {}, challenge_phase_id {} and dataset_split_id {} found in challenge phase splits.",
    "sponsor_not_found": "ERROR: Sponsor name or url not found in YAML data.",
    "prize_not_found": "ERROR: Prize rank or amount not found in YAML data.",
    "duplicate_rank": "ERROR: Duplicate rank {} found in YAML data.",
    "prize_amount_wrong": "ERROR: Invalid amount value {}. Amount should be in decimal format with three-letter currency code (e.g. 100.00USD, 500EUR, 1000INR).",
    "prize_rank_wrong": "ERROR: Invalid rank value {}. Rank should be an integer.",
    "challenge_metadata_schema_errors": "ERROR: Unable to serialize the challenge because of the following errors: {}.",
    "evaluation_script_not_zip": "ERROR: Please pass in a zip file as evaluation script. If using the `evaluation_script` directory (recommended), it should be `evaluation_script.zip`.",
}


class ValidateChallengeConfigUtil:
    def __init__(
        self,
        request,
        challenge_host_team,
        base_location,
        unique_folder_name,
        zip_ref,
        current_challenge,
    ):
        """
        Class containing methods to validate the challenge configuration

        Arguments:
            request {HttpRequest} -- The request object
            challenge_host_team {int} -- the team creating the challenge
            base_location {str} -- The temp base directory for storing all the files and folders while validating the zip file
            unique_folder_name {str} -- name of the challenge zip file and the parent dir of extracted folder
            zip_ref {zipfile.ZipFile} -- reference to challenge config zip
            current_challenge {apps.challenges.models.Challenge} - the existing challenge for the github repo, if any
        """
        self.request = request
        self.challenge_host_team = challenge_host_team
        self.base_location = base_location
        self.unique_folder_name = unique_folder_name
        self.zip_ref = zip_ref
        self.current_challenge = current_challenge

        self.error_messages = []
        self.files = {}
        self.yaml_file_data = None
        self.error_messages_dict = error_message_dict

        (
            self.yaml_file_count,
            self.yaml_file,
            self.extracted_folder_name,
        ) = get_yaml_files_from_challenge_config(self.zip_ref)

        self.valid_yaml = self.read_and_validate_yaml()
        if self.valid_yaml:
            self.challenge_config_location = join(
                self.base_location,
                self.unique_folder_name,
                self.extracted_folder_name,
            )
        self.phase_ids = []
        self.leaderboard_ids = []

    def read_and_validate_yaml(self):
        if not self.yaml_file_count:
            message = self.error_messages_dict.get("no_yaml_file")
            self.error_messages.append(message)
            return False

        if self.yaml_file_count > 1:
            message = self.error_messages_dict.get(
                "multiple_yaml_files"
            ).format(self.yaml_file_count)
            self.error_messages.append(message)
            return False

        # YAML Read Error
        try:
            self.yaml_file_path = join(
                self.base_location, self.unique_folder_name, self.yaml_file
            )
            self.yaml_file_data = read_yaml_file(self.yaml_file_path, "r")
            return True
        except (yaml.YAMLError, ScannerError) as exc:
            (
                error_description,
                line_number,
                column_number,
            ) = get_yaml_read_error(exc)
            message = self.error_messages_dict.get(
                "yaml_file_read_error"
            ).format(error_description, line_number, column_number)
            self.error_messages.append(message)
            return False

    def validate_challenge_title(self):
        challenge_title = self.yaml_file_data.get("title")
        if not challenge_title or len(challenge_title) == 0:
            message = self.error_messages_dict.get("missing_challenge_title")
            self.error_messages.append(message)

    def validate_challenge_logo(self):
        image = self.yaml_file_data.get("image")
        if image and (
            image.endswith(".jpg")
            or image.endswith(".jpeg")
            or image.endswith(".png")
        ):
            self.challenge_image_path = join(
                self.base_location,
                self.unique_folder_name,
                self.extracted_folder_name,
                image,
            )

            if isfile(self.challenge_image_path):
                self.challenge_image_file = ContentFile(
                    get_file_content(self.challenge_image_path, "rb"), image
                )
            else:
                self.challenge_image_file = None
        else:
            self.challenge_image_file = None
        self.files["challenge_image_file"] = self.challenge_image_file

    def validate_challenge_description(self):
        challenge_description = self.yaml_file_data.get("description")
        if not challenge_description or len(challenge_description) == 0:
            message = self.error_messages_dict.get(
                "missing_challenge_description"
            )
            self.error_messages.append(message)
        else:
            is_valid, message = is_challenge_config_yaml_html_field_valid(
                self.yaml_file_data,
                "description",
                self.challenge_config_location,
            )
            if not is_valid:
                self.error_messages.append(message)
            else:
                self.yaml_file_data["description"] = get_value_from_field(
                    self.yaml_file_data,
                    self.challenge_config_location,
                    "description",
                )

    # Check for evaluation details file
    def validate_evaluation_details_file(self):
        evaluation_details = self.yaml_file_data.get("evaluation_details")
        if not evaluation_details or len(evaluation_details) == 0:
            message = self.error_messages_dict.get(
                "missing_evaluation_details"
            )
            self.error_messages.append(message)
        else:
            is_valid, message = is_challenge_config_yaml_html_field_valid(
                self.yaml_file_data,
                "evaluation_details",
                self.challenge_config_location,
            )
            if not is_valid:
                self.error_messages.append(message)
            else:
                self.yaml_file_data[
                    "evaluation_details"
                ] = get_value_from_field(
                    self.yaml_file_data,
                    self.challenge_config_location,
                    "evaluation_details",
                )

    # Validate terms and conditions file
    def validate_terms_and_conditions_file(self):
        terms_and_conditions = self.yaml_file_data.get("terms_and_conditions")
        if not terms_and_conditions or len(terms_and_conditions) == 0:
            message = self.error_messages_dict.get(
                "missing_terms_and_conditions"
            )
            self.error_messages.append(message)
        else:
            is_valid, message = is_challenge_config_yaml_html_field_valid(
                self.yaml_file_data,
                "terms_and_conditions",
                self.challenge_config_location,
            )
            if not is_valid:
                self.error_messages.append(message)
            else:
                self.yaml_file_data[
                    "terms_and_conditions"
                ] = get_value_from_field(
                    self.yaml_file_data,
                    self.challenge_config_location,
                    "terms_and_conditions",
                )

    # Validate  ubmission guidelines file
    def validate_submission_guidelines_file(self):
        submission_guidelines = self.yaml_file_data.get(
            "submission_guidelines"
        )
        if not submission_guidelines or len(submission_guidelines) == 0:
            message = self.error_messages_dict.get(
                "missing_submission_guidelines"
            )
            self.error_messages.append(message)
        else:
            is_valid, message = is_challenge_config_yaml_html_field_valid(
                self.yaml_file_data,
                "submission_guidelines",
                self.challenge_config_location,
            )
            if not is_valid:
                self.error_messages.append(message)
            else:
                self.yaml_file_data[
                    "submission_guidelines"
                ] = get_value_from_field(
                    self.yaml_file_data,
                    self.challenge_config_location,
                    "submission_guidelines",
                )

    def validate_evaluation_script_file(self):
        evaluation_script = self.yaml_file_data.get("evaluation_script")
        if evaluation_script:
            if not evaluation_script.endswith('.zip'):
                message = self.error_messages_dict.get(
                    "evaluation_script_not_zip"
                )
                self.error_messages.append(message)
            else:
                evaluation_script_path = join(
                    self.challenge_config_location, evaluation_script
                )
                # Check for evaluation script file in extracted zip folder
                if isfile(evaluation_script_path):
                    self.challenge_evaluation_script_file = (
                        read_file_data_as_content_file(
                            evaluation_script_path, "rb", evaluation_script_path
                        )
                    )
                    self.files[
                        "challenge_evaluation_script_file"
                    ] = self.challenge_evaluation_script_file
                else:
                    message = self.error_messages_dict.get(
                        "missing_evaluation_script"
                    )
                    self.error_messages.append(message)
        else:
            message = self.error_messages_dict.get(
                "missing_evaluation_script_key"
            )
            self.error_messages.append(message)

    def validate_dates(self):
        start_date = self.yaml_file_data.get("start_date")
        end_date = self.yaml_file_data.get("end_date")

        if not start_date or not end_date:
            message = self.error_messages_dict.get("missing_date")
            self.error_messages.append(message)
        if start_date and end_date:
            if start_date > end_date:
                message = self.error_messages_dict.get(
                    "start_date_greater_than_end_date"
                )
                self.error_messages.append(message)

    def validate_serializer(self):
        if not len(self.error_messages):
            serializer = ZipChallengeSerializer(
                data=self.yaml_file_data,
                context={
                    "request": self.request,
                    "challenge_host_team": self.challenge_host_team,
                    "image": self.challenge_image_file,
                    "evaluation_script": self.challenge_evaluation_script_file,
                    "github_repository": self.request.data[
                        "GITHUB_REPOSITORY"
                    ],
                },
            )
            if not serializer.is_valid():
                message = self.error_messages_dict[
                    "challenge_metadata_schema_errors"
                ].format(str(serializer.errors))
                self.error_messages.append(message)

    # Check for leaderboards
    def validate_leaderboards(self, current_leaderboard_config_ids):
        leaderboard = self.yaml_file_data.get("leaderboard")
        if leaderboard:
            for data in leaderboard:
                error = False
                if "id" not in data:
                    message = self.error_messages_dict.get(
                        "missing_leaderboard_id"
                    )
                    self.error_messages.append(message)
                    error = True
                if "schema" not in data:
                    message = self.error_messages_dict.get(
                        "missing_leaderboard_schema"
                    ).format(data.get("id"))
                    self.error_messages.append(message)
                    error = True
                else:
                    if "labels" not in data["schema"]:
                        message = self.error_messages_dict.get(
                            "missing_leaderboard_labels"
                        ).format(data.get("id"))
                        self.error_messages.append(message)
                        error = True
                    if "default_order_by" not in data["schema"]:
                        message = self.error_messages_dict.get(
                            "missing_leaderboard_default_order_by"
                        ).format(data.get("id"))
                        self.error_messages.append(message)
                        error = True
                    else:
                        default_order_by = data["schema"]["default_order_by"]
                        if (
                            "labels" in data["schema"]
                            and default_order_by
                            not in data["schema"]["labels"]
                        ):
                            message = self.error_messages_dict.get(
                                "incorrect_default_order_by"
                            ).format(default_order_by, data.get("id"))
                            self.error_messages.append(message)
                            error = True
                if not error:
                    serializer = LeaderboardSerializer(
                        data=data, context={"config_id": data["id"]}
                    )
                    if not serializer.is_valid():
                        serializer_error = str(serializer.errors)
                        message = self.error_messages_dict.get(
                            "leaderboard_schema_error"
                        ).format(data["id"], serializer_error)
                        self.error_messages.append(message)
                    else:
                        if (
                            current_leaderboard_config_ids
                            and int(data["id"])
                            not in current_leaderboard_config_ids
                        ):
                            message = self.error_messages_dict.get(
                                "leaderboard_additon_after_creation"
                            ).format(data["id"])
                            self.error_messages.append(message)
                        self.leaderboard_ids.append(data["id"])
        else:
            message = self.error_messages_dict.get("missing_leaderboard_key")
            self.error_messages.append(message)

        for current_leaderboard_id in current_leaderboard_config_ids:
            if current_leaderboard_id not in self.leaderboard_ids:
                message = self.error_messages_dict.get(
                    "leaderboard_deletion_after_creation"
                ).format(current_leaderboard_id)
                self.error_messages.append(message)

    # Check for challenge phases
    def validate_challenge_phases(self, current_phase_config_ids):
        challenge_phases_data = self.yaml_file_data.get("challenge_phases")
        if not challenge_phases_data:
            message = self.error_messages_dict["missing_challenge_phases"]
            self.error_messages.append(message)
            return self.error_messages, self.yaml_file_data, self.files

        self.phase_ids = []
        phase_codenames = []
        self.files["challenge_test_annotation_files"] = []
        for data in challenge_phases_data:
            if "codename" not in data:
                self.error_messages.append(
                    self.error_messages_dict["no_codename_for_challenge_phase"]
                )
            else:
                if data["codename"] not in phase_codenames:
                    phase_codenames.append(data["codename"])
                else:
                    message = self.error_messages_dict[
                        "duplicate_codename_for_phase"
                    ].format(data["codename"], data["name"])
                    self.error_messages.append(message)
            test_annotation_file = data.get("test_annotation_file")
            if test_annotation_file:
                test_annotation_file_path = join(
                    self.challenge_config_location, test_annotation_file
                )
                if isfile(test_annotation_file_path):
                    challenge_test_annotation_file = (
                        read_file_data_as_content_file(
                            test_annotation_file_path,
                            "rb",
                            test_annotation_file_path,
                        )
                    )
                    self.files["challenge_test_annotation_files"].append(
                        challenge_test_annotation_file
                    )
                else:
                    message = self.error_messages_dict[
                        "no_test_annotation_file_found"
                    ].format(data["name"])
                    self.error_messages.append(message)
            else:
                test_annotation_file_path = None
                self.files["challenge_test_annotation_files"].append(None)

            if data.get("max_submissions_per_month", None) is None:
                data["max_submissions_per_month"] = data.get(
                    "max_submissions", None
                )

            (
                is_valid,
                message,
            ) = is_challenge_phase_config_yaml_html_field_valid(
                data, "description", self.challenge_config_location
            )
            if not is_valid:
                self.error_messages.append(message)
            else:
                data["description"] = get_value_from_field(
                    data, self.challenge_config_location, "description"
                )

            if data.get("is_submission_public") and data.get(
                "is_restricted_to_select_one_submission"
            ):
                message = self.error_messages_dict[
                    "is_submission_public_restricted"
                ].format(data["name"])
                self.error_messages.append(message)

            start_date = data.get("start_date")
            end_date = data.get("end_date")
            if not start_date or not end_date:
                message = self.error_messages_dict.get(
                    "missing_dates_challenge_phase"
                ).format(data.get("id"))
                self.error_messages.append(message)
            if start_date and end_date:
                if start_date > end_date:
                    message = self.error_messages_dict.get(
                        "start_date_greater_than_end_date_challenge_phase"
                    ).format(data.get("id"))
                    self.error_messages.append(message)

            # To ensure that the schema for submission meta attributes is valid
            if data.get("submission_meta_attributes"):
                for attribute in data["submission_meta_attributes"]:
                    keys = ["name", "description", "type"]
                    missing_keys = get_missing_keys_from_dict(attribute, keys)

                    if len(missing_keys) == 0:
                        valid_attribute_types = [
                            "boolean",
                            "text",
                            "radio",
                            "checkbox",
                        ]
                        attribute_type = attribute["type"]
                        if attribute_type in valid_attribute_types:
                            if (
                                attribute_type == "radio"
                                or attribute_type == "checkbox"
                            ):
                                options = attribute.get("options")
                                if not options or not len(options):
                                    message = self.error_messages_dict[
                                        "missing_option_in_submission_meta_attribute"
                                    ].format(data["id"])
                                    self.error_messages.append(message)
                        else:
                            message = self.error_messages_dict[
                                "invalid_submission_meta_attribute_types"
                            ].format(attribute_type, data["id"])
                            self.error_messages.append(message)
                    else:
                        missing_keys_string = ", ".join(missing_keys)
                        message = self.error_messages_dict[
                            "missing_fields_in_submission_meta_attribute"
                        ].format(data["id"], missing_keys_string)
                        self.error_messages.append(message)

            if test_annotation_file_path is not None and isfile(
                test_annotation_file_path
            ):
                serializer = ChallengePhaseCreateSerializer(
                    data=data,
                    context={
                        "exclude_fields": ["challenge"],
                        "test_annotation": challenge_test_annotation_file,
                        "config_id": data["id"],
                    },
                )
            else:
                serializer = ChallengePhaseCreateSerializer(
                    data=data,
                    context={
                        "exclude_fields": ["challenge"],
                        "config_id": data["id"],
                    },
                )
            if not serializer.is_valid():
                serializer_error = str(serializer.errors)
                message = self.error_messages_dict[
                    "challenge_phase_schema_errors"
                ].format(data["id"], serializer_error)
                self.error_messages.append(message)
            else:
                if (
                    current_phase_config_ids
                    and int(data["id"]) not in current_phase_config_ids
                ):
                    message = self.error_messages_dict[
                        "challenge_phase_addition"
                    ].format(data["id"])
                    self.error_messages.append(message)
                self.phase_ids.append(data["id"])

        for current_challenge_phase_id in current_phase_config_ids:
            if current_challenge_phase_id not in self.phase_ids:
                message = self.error_messages_dict[
                    "challenge_phase_not_found"
                ].format(current_challenge_phase_id)
                self.error_messages.append(message)

    def validate_challenge_phase_splits(self, current_phase_split_ids):
        challenge_phase_splits = self.yaml_file_data.get(
            "challenge_phase_splits"
        )
        # Check for duplicate combinations
        challenge_phase_split_set = set()
        duplicates_found = False
        total_duplicates = []

        for data in challenge_phase_splits:
            combination = (
                data["leaderboard_id"],
                data["challenge_phase_id"],
                data["dataset_split_id"],
            )
            if combination in challenge_phase_split_set:
                duplicates_found = True
                total_duplicates += [combination]
            else:
                challenge_phase_split_set.add(combination)

        if duplicates_found:
            message = self.error_messages_dict[
                "duplicate_combinations_in_challenge_phase_splits"
            ]
            self.error_messages.append(message)
            for combination in total_duplicates:
                message = self.error_messages_dict[
                    "duplicate_combinations_in_challenge_phase_splits"
                ].format(combination[0], combination[1], combination[2])
                self.error_messages.append(message)

        challenge_phase_split_uuids = []
        if challenge_phase_splits:
            phase_split = 1
            exclude_fields = [
                "challenge_phase",
                "dataset_split",
                "leaderboard",
            ]
            for data in challenge_phase_splits:
                expected_keys = {
                    "is_leaderboard_order_descending",
                    "leaderboard_decimal_precision",
                    "visibility",
                    "dataset_split_id",
                    "leaderboard_id",
                    "challenge_phase_id",
                }
                if expected_keys.issubset(data.keys()):
                    if (
                        current_phase_split_ids
                        and (
                            data["leaderboard_id"],
                            data["challenge_phase_id"],
                            data["dataset_split_id"],
                        )
                        not in current_phase_split_ids
                    ):
                        message = self.error_messages_dict[
                            "challenge_phase_split_not_exist"
                        ].format(
                            data["leaderboard_id"],
                            data["challenge_phase_id"],
                            data["dataset_split_id"],
                        )
                        self.error_messages.append(message)
                    else:
                        challenge_phase_split_uuids.append(
                            (
                                data["leaderboard_id"],
                                data["challenge_phase_id"],
                                data["dataset_split_id"],
                            )
                        )

                    (
                        is_mapping_valid,
                        messages,
                    ) = is_challenge_phase_split_mapping_valid(
                        self.phase_ids,
                        self.leaderboard_ids,
                        self.dataset_splits_ids,
                        data,
                        phase_split,
                    )
                    self.error_messages += messages

                    serializer = ZipChallengePhaseSplitSerializer(
                        data=data, context={"exclude_fields": exclude_fields}
                    )
                    if not serializer.is_valid():
                        serializer_error = str(serializer.errors)
                        message = self.error_messages_dict[
                            "challenge_phase_split_schema_errors"
                        ].format(phase_split, serializer_error)
                        self.error_messages.append(message)
                    phase_split += 1
                else:
                    missing_keys = expected_keys - data.keys()
                    missing_keys_string = ", ".join(missing_keys)
                    message = self.error_messages_dict[
                        "missing_keys_in_challenge_phase_splits"
                    ].format(phase_split, missing_keys_string)
                    self.error_messages.append(message)
            for uuid in current_phase_split_ids:
                if uuid not in challenge_phase_split_uuids:
                    message = self.error_messages_dict[
                        "challenge_phase_split_not_found"
                    ].format(uuid[0], uuid[1], uuid[2])
                    self.error_messages.append(message)
        else:
            message = self.error_messages_dict[
                "no_key_for_challenge_phase_splits"
            ]
            self.error_messages.append(message)

    # Check for dataset splits
    def validate_dataset_splits(self, current_dataset_config_ids):
        dataset_splits = self.yaml_file_data.get("dataset_splits")
        dataset_split_codenames = []
        self.dataset_splits_ids = []
        if dataset_splits:
            for split in dataset_splits:
                name = split.get("name")
                if not name:
                    message = self.error_messages_dict[
                        "missing_dataset_split_name"
                    ].format(split.get("id"))
                    self.error_messages.append(message)
                if "codename" not in split:
                    message = self.error_messages_dict[
                        "missing_dataset_split_codename"
                    ].format(split.get("id"))
                    self.error_messages.append(message)
                else:
                    if split["codename"] not in dataset_split_codenames:
                        dataset_split_codenames.append(split["codename"])
                    else:
                        message = self.error_messages_dict[
                            "duplicate_dataset_split_codename"
                        ].format(split["codename"], split["name"])
                        self.error_messages.append(message)
            for split in dataset_splits:
                serializer = DatasetSplitSerializer(
                    data=split, context={"config_id": split["id"]}
                )
                if not serializer.is_valid():
                    serializer_error = str(serializer.errors)
                    message = self.error_messages_dict[
                        "dataset_split_schema_errors"
                    ].format(split["id"], serializer_error)
                    self.error_messages.append(message)
                else:
                    if (
                        current_dataset_config_ids
                        and int(split["id"]) not in current_dataset_config_ids
                    ):
                        message = self.error_messages_dict[
                            "dataset_split_addition"
                        ].format(split["id"])
                        self.error_messages.append(message)
                    self.dataset_splits_ids.append(split["id"])
        else:
            message = self.error_messages_dict["missing_dataset_splits_key"]
            self.error_messages.append(message)

        for current_dataset_split_config_id in current_dataset_config_ids:
            if current_dataset_split_config_id not in self.dataset_splits_ids:
                message = self.error_messages_dict[
                    "missing_existing_dataset_split_id"
                ].format(current_dataset_split_config_id)
                self.error_messages.append(message)

    # Check for Tags and Domain
    def check_tags(self):
        if "tags" in self.yaml_file_data:
            tags_data = self.yaml_file_data["tags"]
            # Verify Tags are limited to 4
            if len(tags_data) > 4:
                message = self.error_messages_dict["extra_tags"]
                self.error_messages.append(message)

    def check_domain(self):
        # Verify Domain name is correct
        if "domain" in self.yaml_file_data:
            domain_value = self.yaml_file_data["domain"]
            domain_choice = [option[0] for option in Challenge.DOMAIN_OPTIONS]
            if domain_value not in domain_choice:
                message = self.error_messages_dict["wrong_domain"]
                self.error_messages.append(message)

    def check_sponsor(self):
        # Verify Sponsor is correct
        if "sponsors" in self.yaml_file_data:
            for sponsor in self.yaml_file_data["sponsors"]:
                if 'name' not in sponsor or 'website' not in sponsor:
                    message = self.error_messages_dict["sponsor_not_found"]
                    self.error_messages.append(message)

    def check_prizes(self):
        # Verify Prizes are correct
        if "prizes" in self.yaml_file_data:
            rank_set = set()
            for prize in self.yaml_file_data["prizes"]:
                if 'rank' not in prize or 'amount' not in prize:
                    message = self.error_messages_dict["prize_not_found"]
                    self.error_messages.append(message)
                # Check for duplicate rank.
                rank = prize['rank']
                if rank in rank_set:
                    message = self.error_messages_dict["duplicate_rank"].format(rank)
                    self.error_messages.append(message)
                rank_set.add(rank)
                if not isinstance(rank, int) or rank < 1:
                    message = self.error_messages_dict["prize_rank_wrong"].format(rank)
                    self.error_messages.append(message)
                if not re.match(r'^\d+(\.\d{1,2})?[A-Z]{3}$', prize["amount"]):
                    message = self.error_messages_dict["prize_amount_wrong"].format(prize["amount"])
                    self.error_messages.append(message)


def validate_challenge_config_util(
    request,
    challenge_host_team,
    BASE_LOCATION,
    unique_folder_name,
    zip_ref,
    current_challenge,
):
    """
    Function to validate a challenge config

    Arguments:
        request {HttpRequest} -- The request object
        BASE_LOCATION {str} -- The temp base directory for storing all the files and folders while validating the zip file
        unique_folder_name {str} -- name of the challenge zip file and the parent dir of extracted folder
        zip_ref {zipfile.ZipFile} -- reference to challenge config zip
        current_challenge {apps.challenges.models.Challenge} - the existing challenge for the github repo, if any
    """

    val_config_util = ValidateChallengeConfigUtil(
        request,
        challenge_host_team,
        BASE_LOCATION,
        unique_folder_name,
        zip_ref,
        current_challenge,
    )
    if not val_config_util.valid_yaml:
        return (
            val_config_util.error_messages,
            val_config_util.yaml_file_data,
            val_config_util.files,
        )

    # # Validate challenge title
    val_config_util.validate_challenge_title()

    # Validate challenge logo
    val_config_util.validate_challenge_logo()

    # Validate challenge description
    val_config_util.validate_challenge_description()

    # Validate evaluation details
    val_config_util.validate_evaluation_details_file()

    # Validate terms and conditions
    val_config_util.validate_terms_and_conditions_file()

    # Validate submission guidelines
    val_config_util.validate_submission_guidelines_file()

    # Validate evaluation script
    val_config_util.validate_evaluation_script_file()

    val_config_util.validate_dates()

    val_config_util.validate_serializer()

    # Get existing config IDs for leaderboards and dataset splits
    if current_challenge:
        current_challenge_phases = ChallengePhase.objects.filter(
            challenge=current_challenge.id
        )
        current_challenge_phase_splits = ChallengePhaseSplit.objects.filter(
            challenge_phase__in=current_challenge_phases
        )
        current_leaderboards = Leaderboard.objects.filter(
            id__in=current_challenge_phase_splits.values("leaderboard")
        )
        current_dataset_splits = DatasetSplit.objects.filter(
            id__in=current_challenge_phase_splits.values("dataset_split")
        )

        current_leaderboard_config_ids = [
            int(x.config_id) for x in current_leaderboards
        ]
        current_dataset_config_ids = [
            int(x.config_id) for x in current_dataset_splits
        ]
        current_phase_config_ids = [
            int(x.config_id) for x in current_challenge_phases
        ]
        current_phase_split_ids = [
            (
                split.leaderboard.config_id,
                split.challenge_phase.config_id,
                split.dataset_split.config_id,
            )
            for split in current_challenge_phase_splits
        ]
    else:
        current_leaderboard_config_ids = []
        current_dataset_config_ids = []
        current_phase_config_ids = []
        current_phase_split_ids = []

    # Validate leaderboards
    val_config_util.validate_leaderboards(current_leaderboard_config_ids)

    # Validate challenge phases
    val_config_util.validate_challenge_phases(current_phase_config_ids)

    # Validate dataset splits
    val_config_util.validate_dataset_splits(current_dataset_config_ids)

    # Validate challenge phase splits
    val_config_util.validate_challenge_phase_splits(current_phase_split_ids)

    # Validate tags
    val_config_util.check_tags()

    # Validate domain
    val_config_util.check_domain()
    # Check for Sponsor
    # val_config_util.check_sponsor()

    # Check for Prize
    val_config_util.check_prizes()

    return (
        val_config_util.error_messages,
        val_config_util.yaml_file_data,
        val_config_util.files,
    )