SpartaHack/SpartaHack-API

View on GitHub
resources/sessions.py

Summary

Maintainability
B
5 hrs
Test Coverage
from flask_restful import Resource
from werkzeug.exceptions import BadRequest
from flask import request, jsonify, g
from datetime import datetime
from sqlalchemy import exists, and_
from sqlalchemy.orm.exc import NoResultFound
from common.json_schema import Sessions_Schema, User_Schema
from common.utils import headers, is_logged_in, has_admin_privileges, waste_time, verify_pass
from common.utils import bad_request, unauthorized, forbidden, not_found, internal_server_error, unprocessable_entity, conflict
from common.utils import verify_id_token
import random
import secrets


class Sessions_D(Resource):
    """
    For DELETE for specific session
    get http headers using request.headers.get("header_name")
    """

    def delete(self, user_token):
        """
        Delete the session. Required data: user_token
        """
        # getting the user. Assuming the user exists. Case of user not existing is checked below
        try:
            user = g.session.query(g.Base.classes.users).filter(
                g.Base.classes.users.auth_token == user_token).one()
        except NoResultFound:
            # *If no user with that email is found with that token then you send 403 error
            forbidden["error_list"]["token"] = "User not logged in"
            return (forbidden, 403, headers)
        except Exception as err:
            print(type(err))
            print(err)
            return (internal_server_error, 500, headers)
        auth_token = secrets.token_urlsafe(25)
        user.auth_token = auth_token

        g.session.commit()
        return ('', 204, headers)


class Sessions_C(Resource):
    """
    To create new session using POST
    """

    def post(self):
        """
        for POST for session
        """
        try:
            data = request.get_json(force=True)
        except BadRequest:
            return (bad_request, 400, headers)

        # *request data validation. Check for empty fields will be done by frontend
        validation = Sessions_Schema().validate(data)
        if validation:
            unprocessable_entity["error_list"] = validation
            return (unprocessable_entity, 422, headers)

        # check if ID_Token is valid
        payload = verify_id_token(data["ID_Token"])
        if payload == False:
            return (bad_request, 400, headers)

        # getting the user. Assuming the user exists. Case of user not existing is checked below
        try:
            user = g.session.query(g.Base.classes.users).filter(
                g.Base.classes.users.email == data["email"]).one()
        except NoResultFound:
            # *If no user with that email is found with that token then you send 403 error
            forbidden["error_list"]["email"] = "No user found with that email and password combination"
            return (forbidden, 403, headers)
        except Exception as err:
            print(type(err))
            print(err)
            return (internal_server_error, 500, headers)

        auth_token = secrets.token_urlsafe(25)

        user.auth_token = auth_token
        g.session.commit()

        ret = User_Schema().dump(user)

        # *This is different than old API in the sense that a user does not necessarily have a application at the time of account creation. User has the option of submitting the application later on.
        ret["application_id"] = user.applications_collection[0].id if user.applications_collection else None

        # *set rsvp_id if rsvp object is found in the rsvps_collection i.e. if the user has rsvped
        ret["rsvp_id"] = user.rsvps_collection[0].id if user.rsvps_collection else None

        return (ret, 200, headers)