flxwu/contest.io

View on GitHub
server/__init__.py

Summary

Maintainability
D
1 day
Test Coverage
from flask import Flask, jsonify, render_template,send_from_directory,request, g, session, redirect, url_for, flash
from flask_cors import CORS
from werkzeug.routing import BaseConverter
from http import HTTPStatus
import json
import requests
import flask_github
import server.api.api_connector as api_connector
import server.database.models as models
import server.settings as settings


class RegexConverter(BaseConverter):
    def __init__(self, urlMap, *items):
        super(RegexConverter, self).__init__(urlMap)
        self.regex = items[0]


def get_queryparam(p: str): return request.args.get(  # pylint: disable=invalid-name
    p) if request.args.get(p) else None


app = Flask(__name__,  # pylint: disable=invalid-name
            static_folder="../dist",
            template_folder="../dist")

# CORS
cors = CORS(app, resources={r"/api/*": {"origins": "*"}}  # pylint:disable=invalid-name
            )
# Flask app config
app.config['GITHUB_CLIENT_ID'] = settings.GITHUB_CLIENT_ID
app.config['GITHUB_CLIENT_SECRET'] = settings.GITHUB_CLIENT_SECRET
app.secret_key = settings.SECRET_KEY
app.config['SESSION_TYPE'] = 'filesystem'

app.url_map.converters['regex'] = RegexConverter

# Set Endpoints
TasksEndpoint = api_connector.Tasks()  # pylint: disable=invalid-name
UserContestResultsEndpoint = api_connector.UserContestResults() # pylint: disable=invalid-name  

# Github-Flask
github = flask_github.GitHub(app)  # pylint: disable=invalid-name


@app.before_request
def before_request():
    g.user = None
    if 'user_id' in session:
        g.user = models.select_user(
            params=('*'),
            conditions=(
                '{}=\"{}\"'.format(
                    settings.DB_COLUMNS.USER_USERID,
                    session['user_id'])))


@app.route('/')
def index():
    # if app.debug:
    #     return requests.get('http://localhost:3000/index.html').text
    return render_template("index.html")


@app.route('/api/github-login')
def auth_githublogin():
    if session.get('user_id', None) is None:
        return github.authorize()
    else:
        return 'Already logged in'


@app.route('/api/github-logout')
def auth_githublogout():
    session.pop('user_id', None)
    session.pop('oauth_token', None)
    return redirect(url_for('index'))


@github.access_token_getter
def token_getter():
    user = g.user
    if user is not None:
        return user['oauth_token']


@app.route('/api/github-callback')
@github.authorized_handler
def auth_githubcallback(oauthToken):
    nextUrl = request.args.get('next') or url_for('index')
    if oauthToken is None:
        flash("Authorization failed.")
        return redirect(nextUrl)

    user = models.select_user(params=('*'), conditions=(
        '{}=\"{}\"'.format(settings.DB_COLUMNS.USER_OAUTH_TOKEN, oauthToken)))
    if user is None:
        models.insert_user(
            'defaultUser', settings.NORMAL_USERTYPE, oauthToken)
    user = models.select_user(params=('*'), conditions=(
        '{}=\"{}\"'.format(settings.DB_COLUMNS.USER_OAUTH_TOKEN, oauthToken)))

    session['user_id'] = user['userid']
    session.pop('oauth_token', None)
    session['oauth_token'] = oauthToken
    return redirect(nextUrl)


@app.route('/api/github-user')
def auth_user():
    # update inserted User
    try:
        userData = github.get('user')
        userLoginName = userData['login']
    except flask_github.GitHubError as githubError:
        return str(githubError)

    # check if user already exists
    user = models.select_user(params=('*'), conditions=(
        '{}=\"{}\"'.format(settings.DB_COLUMNS.USER_USERNAME, userLoginName)))
    if user is None:
        models.update_user(
            updatedValues=('{}=\"{}\"'.format(
                settings.DB_COLUMNS.USER_USERNAME, userLoginName)),
            setConditions=('{}=\"{}\"'.format(
                settings.DB_COLUMNS.USER_USERID,
                session.get('user_id', None)
            )))
    else:
        models.delete_user(
            deleteConditions=(
                '{}=\"{}\"'.format(
                    settings.DB_COLUMNS.USER_USERNAME,
                    'defaultUser')))
        models.update_user(
            updatedValues=(
                '{}=\"{}\"'.format(
                    settings.DB_COLUMNS.USER_OAUTH_TOKEN, session.get(
                        'oauth_token', None)),
                '{}=\"{}\"'.format(
                    settings.DB_COLUMNS.USER_AVATAR_URL, userData['avatar_url']),
                '{}=\"{}\"'.format(
                    settings.DB_COLUMNS.USER_USEREMAIL, userData['email'])
            ),
            setConditions=(
                '{}=\"{}\"'.format(
                    settings.DB_COLUMNS.USER_USERNAME, userLoginName)))
        user = models.select_user(
            params=('*'),
            conditions=(
                '{}=\"{}\"'.format(
                    settings.DB_COLUMNS.USER_USERNAME,
                    userLoginName)))
        session.pop('user_id', None)
        session['user_id'] = user['userid']
    return jsonify(dict(id=user['userid'], ghdata=userData))


@app.route('/api/tasks', methods=['GET', 'POST'])
def api_tasks():
    if request.method == 'GET':
        returnJSON = None
        apiResponse = None

        def tasks_json_modify(taskJson: dict):
            taskJson['tasktags'] = json.loads(taskJson['tasktags'])
            return taskJson

        if get_queryparam('tags') is None:
            tasksInDatabase = models.select_task(params=('*'))
        else:
            tasksInDatabase = models.select_task(
                params=('*'),
                conditions=(
                    '{} LIKE "%{}%"'.format(
                        settings.DB_COLUMNS.TASK_TASKTAGS,
                        get_queryparam('tags'))))

        if tasksInDatabase is not None:
            returnJSON = tasksInDatabase
        else:
            # response = TasksEndpoint.get(tags=queryparam_tags())
            apiResponse = TasksEndpoint.get(tags=get_queryparam('tags'))
            if get_queryparam('tags') is None:
                returnJSON = models.select_task(params=('*'))
            else:
                tagsArray = str(get_queryparam('tags')).split(';')
                combinedTags = []
                for tag in tagsArray:
                    combinedTags.append('{} LIKE "%{}%"'.format(
                        settings.DB_COLUMNS.TASK_TASKTAGS,
                        tag))
                returnJSON = models.select_task(
                    params=('*'),
                    conditions=(tuple(combinedTags)))
        if (isinstance(apiResponse, requests.exceptions.RequestException)):
            return str(apiResponse)
        else:
            return jsonify([tasks_json_modify(task) for task in returnJSON])
    elif request.method == 'POST':
        return None
    else:
        return None


@app.route('/api/contest', methods=['GET', 'POST', 'DELETE'])
def api_contest():
    """
    Contest Endpoint: POST with Content-Type = application/json
    {
            "contestname": "testContest-1",
            "contestadmin": 1,
            "date_start": "2017-05-12",
            "date_end": "2017-06-12",
            "visible": 1,
            "usergroups": [1, 2, 3] (groupids),
            "tasks": [1,2,3] (taskids)
    }
    """
    if request.method == 'GET':
        if get_queryparam('code') is None:
            content = {"Error": "\'code\' parameter missing"}
            return json.dumps(content), HTTPStatus.BAD_REQUEST, {'ContentType':'application/json'}

        # get contest JSON
        contestJSON = models.select_contest(
            params=('*'),
            conditions=('{}=\"{}\"'.format(
                settings.DB_COLUMNS.CONTEST_CONTESTCODE,
                get_queryparam('code')
            ))
        )

        # get array of all contest's tasks
        tasksInContestJSON = models.get_tasks_in_contest(
            get_queryparam('code'))

        # get array of all contest's groups
        groupsInContestJSON = models.select_group_in_contest(
            params=('*'),
            conditions=('{}=\"{}\"'.format(
                settings.DB_COLUMNS.GROUP_IN_CONTEST_CONTEST,
                get_queryparam('code')
            ))
        )

        returnJSON = contestJSON
        returnJSON["tasks"] = tasksInContestJSON
        returnJSON["usergroups"] = groupsInContestJSON

        return jsonify(returnJSON)
    elif request.method == 'POST':
        postJSON = request.get_json()
        if not postJSON:
            return None
        else:
            # insert contest to db contest table
            contestCode = models.insert_contest(
                postJSON[settings.DB_COLUMNS.CONTEST_CONTESTNAME],
                postJSON[settings.DB_COLUMNS.CONTEST_CONTESTADMIN],
                postJSON[settings.DB_COLUMNS.CONTEST_DATE_START],
                postJSON[settings.DB_COLUMNS.CONTEST_DATE_END],
                postJSON[settings.DB_COLUMNS.CONTEST_VISIBLE]
            )

            # insert taskids with contestcode in db contains_task table
            for taskID in postJSON["tasks"]:
                models.insert_contains_task(
                    contestCode,
                    taskID
                )

            # insert groupIds with contestcode in db group_in_contest table
            for groupID in postJSON["usergroups"]:
                models.insert_group_in_contest(
                    groupID,
                    contestCode
                )

            # return the contestcode
            return contestCode

    elif request.method == 'DELETE':
        if get_queryparam('code') is None:
            content = {"Error": "\'code\' parameter missing"}
            return json.dumps(content), HTTPStatus.BAD_REQUEST, {'ContentType':'application/json'}
        models.delete_contest(
            deleteConditions=('{}=\"{}\"'.format(
                settings.DB_COLUMNS.CONTEST_CONTESTCODE,
                get_queryparam('code')
            ))
        )
        return ('', 204)
    else:
        return None


@app.route('/api/contests.joined', methods=['GET', 'POST', 'DELETE'])
def api_contests_join():
    """
    Join/Leave a contest, GET all contests a user is in
    """
    if request.method == 'GET':
        user = get_queryparam('user')
        returnJSON = models.select_joined_contest(
            params=('*'),
            conditions=('{}=\"{}\"'.format(
                    settings.DB_COLUMNS.JOINED_CONTEST_USER,
                    user
                )
            )
        )
        return jsonify(returnJSON)
    elif request.method == 'POST':
        postJSON = request.get_json()
        models.insert_joined_contest(
            postJSON[settings.DB_COLUMNS.JOINED_CONTEST_USER],
            postJSON[settings.DB_COLUMNS.JOINED_CONTEST_CONTEST]
        )
        return ('', 204)
    elif request.method == 'DELETE':
        postJSON = request.get_json()
        models.delete_joined_contest(
            postJSON[settings.DB_COLUMNS.JOINED_CONTEST_USER],
            postJSON[settings.DB_COLUMNS.JOINED_CONTEST_CONTEST]
        )
        return ('', 204)


@app.route('/api/contest.results', methods=['GET'])
def api_contest_results():
    user = get_queryparam('user')
    contest = get_queryparam('contest')
    force = get_queryparam('force')
    returnJSON = None

    resultsInDatabase = models.get_latest_submissions(user, contest)

    if resultsInDatabase is not None and force is None:
        returnJSON = resultsInDatabase
    else:
        # if no db entries, fetch user's contest submissions from cf api and insert them into db
        cfHandle = models.get_cfhandle(user)
        if cfHandle is None or cfHandle is "defaultUser":
            content = {"Error": "user's cf handle wrong or missing"}
            return json.dumps(content), HTTPStatus.BAD_REQUEST, {'ContentType':'application/json'}
        UserContestResultsEndpoint.get(cfHandle, contest)
        returnJSON = models.get_latest_submissions(user, contest)

    return jsonify(returnJSON)


@app.route('/api/contest.joined', methods=['GET'])
def api_contest_join():
    """
    GET all joined users in a contest
    """
    contestcode = get_queryparam('code')
    returnJSON = models.select_joined_contest(
        params=('user'),
        conditions=('{}=\"{}\"'.format(
                settings.DB_COLUMNS.JOINED_CONTEST_CONTEST,
                contestcode
            )
        )
    )
    return jsonify(returnJSON)


@app.route('/api/contests', methods=['GET'])
def api_contests():
    if request.method == 'GET':
        if get_queryparam('visible'):
            # get all visible contests
            contests = models.select_contest(
                params=('*'),
                conditions=('{}=\"{}\"'.format(
                    settings.DB_COLUMNS.CONTEST_VISIBLE,
                    1))
            )
            return jsonify(contests)
        elif get_queryparam('admin'):
            # get all contests created by "admin"
            contests = models.select_contest(
                params=('*'),
                conditions=('{}=\"{}\"'.format(
                    settings.DB_COLUMNS.CONTEST_CONTESTADMIN,
                    get_queryparam('admin')))
            )
            return jsonify(contests)
        else:
            return None
    else:
        return None


@app.route('/api/user', methods=['GET'])
def api_user():
    if request.method == 'GET':
        # TODO: return user
        return None
    else:
        return None


@app.route('/api/user.cfhandle', methods=['POST'])
def api_user_cfhandle():
    """
        POST endpoint to set a user's codeforces handle
    """
    models.update_user(
        updatedValues=('{}=\"{}\"'.format(
            settings.DB_COLUMNS.USER_CODEFORCES_HANDLE,
            request.get_json()['handle']
        )),
        setConditions=('{}=\"{}\"'.format(
            settings.DB_COLUMNS.USER_USERID,
            request.get_json()['user']
        ))
    )
    return request.get_json()['handle']


@app.route('/api/usergroup', methods=['GET', 'POST'])
def api_usergroup():
    """
    Contest Endpoint: POST with Content-Type = application/json
    {
        "groupname": groupname (String)
        "groupadmin": userID of group admin
    }
    }
    """
    if request.method == 'GET':
        returnJSON = models.select_usergroup(
            params=('*'),
            conditions=('{}=\"{}\"'.format(
                settings.DB_COLUMNS.USERGROUP_GROUPID,
                get_queryparam('group')
            ))
        )
        return jsonify(returnJSON)
    elif request.method == 'POST':
        # add a new Usergroup
        postJSON = request.get_json()
        if not postJSON:
            return None
        else:
            usergroupID = models.insert_usergroup(
                postJSON[settings.DB_COLUMNS.USERGROUP_GROUPNAME],
                postJSON[settings.DB_COLUMNS.USERGROUP_GROUPADMIN])
            models.insert_in_usergroup(
                usergroupID,
                postJSON[settings.DB_COLUMNS.USERGROUP_GROUPADMIN])
            return str(usergroupID)
    else:
        return None


@app.route('/api/usergroup.memberships', methods=['GET'])
def api_usergroup_membership():
    memberships = models.get_memberships_of(
        user=get_queryparam('user'), admin=get_queryparam('admin'))
    return jsonify(memberships)


@app.route('/api/usergroup.members', methods=['GET', 'POST'])
def api_usergroup_members():
    """
    Contest Endpoint: POST with Content-Type = application/json
    {
        "usergroup": usergroupID
        "user": userID
    }
    """
    if request.method == 'GET':
        userIDsInGroup = models.select_in_usergroup(
            params=(settings.DB_COLUMNS.IN_USERGROUP_USER),
            conditions=('{}=\"{}\"'.format(
                settings.DB_COLUMNS.IN_USERGROUP_USERGROUP,
                get_queryparam('group')
            ))
        )
        userObjects = []
        for userID in userIDsInGroup:
            userObjects.append(
                models.select_user(
                    params=('*'),
                    conditions=('{}=\"{}\"'.format(
                        settings.DB_COLUMNS.USER_USERID, userID))
                )
            )
        return jsonify(userObjects)
    elif request.method == 'POST':
        # add user to group
        postJSON = request.get_json()
        if not postJSON:
            return None
        else:
            models.insert_in_usergroup(
                postJSON[settings.DB_COLUMNS.IN_USERGROUP_USERGROUP],
                postJSON[settings.DB_COLUMNS.IN_USERGROUP_USER])
    else:
        return None


@app.route('/js/<path:path>')
def send_js(path):
    return app.send_static_file('js/{}'.format(path))

@app.route('/css/<path:path>')
def send_css(path):
    return app.send_static_file('css/{}'.format(path))

@app.route('/favicon.ico')
def send_favicon():
    return app.send_static_file('favicon.ico')

@app.route('/', defaults={'path': 'index.html'})
@app.route('/<path:path>')
def catch_all(path): # pylint: disable=unused-argument
    return render_template("index.html")