DivisionBy-Zero/erpa-sweng

View on GitHub
server/operations.py

Summary

Maintainability
C
7 hrs
Test Coverage
from contextlib import contextmanager
from typing import Dict, List, Optional, Set
import base64

from datetime import datetime
from sqlalchemy.orm import Session

from contexts import with_context_using_instance
from crypto import Crypto
from game_refiner import refine_queries
from models import Game, GameStatus, User, UserAuth, UserAuthChallenge, \
    UserSessionToken, UserUuid, Username, \
    PlayerJoinGameRequest, PlayerInGameStatus
from session import SessionBroker


def with_session(f):
    @contextmanager
    def mk_session(ops):
        with ops.session_broker.get_session() as session:
            yield session
    return with_context_using_instance(mk_session)(f)


class Operations:
    def __init__(self, session_broker: SessionBroker = None):
        self.session_broker = session_broker
        if not self.session_broker:
            self.session_broker = SessionBroker()

    @with_session
    def get_authenticated_user(self, authtoken: str, session: Session) -> User:
        user = (session.query(User)
                .join(UserSessionToken,
                      User.uuid == UserSessionToken.user_uuid)
                .filter(UserSessionToken.session_token == authtoken)
                .scalar()
                )
        return user

    @with_session
    def create_game(self, new_game: Game, user: User, session: Session
                    ) -> Game:
        if not new_game.uuid_is_valid():
            new_game.generate_new_uuid()

        existing_game_uuid = (session.query(Game.uuid)
                              .filter(Game.uuid == new_game.uuid)
                              .scalar()
                              )

        if existing_game_uuid:
            new_game.generate_new_uuid()
            return self.create_game(new_game, user)

        if not user.is_gm:
            raise ValueError("User is not registered as Game Master")

        new_game.game_status = GameStatus.CREATED
        new_game.gm_user_uuid = user.uuid
        new_game.timestamp_created = datetime.now()
        new_game.timestamp_modified = datetime.now()

        session.add(new_game)

        return new_game

    @with_session
    def update_game(self, updated_game: Game, user: User, session: Session
                    ) -> Game:
        existing_game = (session.query(Game)
                         .filter(Game.uuid == updated_game.uuid)
                         .filter(Game.gm_user_uuid == user.uuid)  # ZKP
                         .scalar()
                         )

        if not existing_game:
            raise KeyError("Game does not exist or user does not have "
                           "the permission to access it.")

        existing_game.fromdict(updated_game.asdict(
            exclude=['uuid', 'gm_user_uuid', 'timestamp_created']))
        existing_game.timestamp_modified = datetime.now()

        session.add(existing_game)

        return existing_game

    @with_session
    def get_game(self, uuid: str, session: Session) -> Game:
        existing_game = (session.query(Game)
                         .filter(Game.uuid == uuid)
                         .scalar()
                         )

        # TODO: Filter out information if the user is not GM or been accepted
        return existing_game

    @with_session
    def get_games(self, session: Session,
                  page_start: int = 0, page_length: int = 20,
                  refinements: Dict[str, str] = dict()) -> List[Game]:
        return (refine_queries(session.query(Game), refinements)
                .offset(page_start)
                .limit(page_length)
                ).all()

    @with_session
    def get_game_participants(self, game_uuid: str, user: Optional[User],
                              session: Session) -> List[PlayerJoinGameRequest]:
        game = self.get_game(game_uuid)
        game_join_requests = (session.query(PlayerJoinGameRequest)
                              .filter(PlayerJoinGameRequest.game_uuid
                                      == game_uuid)
                              .all()
                              )

        def filter_game_join_requests(gp: PlayerJoinGameRequest) -> bool:
            return gp.request_status == PlayerInGameStatus.CONFIRMED or \
                (user is not None and gp.user_uuid == user.uuid)

        # If not the game master
        if not user or user.uuid != game.gm_user_uuid:
            game_join_requests = [gp for gp in game_join_requests
                                  if filter_game_join_requests(gp)]
        return game_join_requests

    @with_session
    def join_game(self, game_uuid: str, user: User, session: Session
                  ) -> PlayerJoinGameRequest:
        if user.uuid == self.get_game(game_uuid).gm_user_uuid:
            raise ValueError("The GM can't join it's own game!")
        game_participants = (session.query(PlayerJoinGameRequest)
                             .filter(PlayerJoinGameRequest.game_uuid
                                     == game_uuid)
                             .all()
                             )
        existing_request = next((jreq for jreq in game_participants
                                 if jreq.user_uuid == user.uuid), None)
        if existing_request:
            request = existing_request
            if request.request_status in [PlayerInGameStatus.HAS_QUIT,
                                          PlayerInGameStatus.REMOVED,
                                          PlayerInGameStatus.REQUEST_TO_JOIN]:
                request.request_status = PlayerInGameStatus.REQUEST_TO_JOIN
            else:
                raise ValueError("A join request already exist.")
        else:
            request = PlayerJoinGameRequest(game_uuid=game_uuid,
                                            user_uuid=user.uuid)
        session.add(request)
        return request

    @with_session
    def update_game_join_request(self, game_uuid: str, user: User,
                                 updated_request: PlayerJoinGameRequest,
                                 session: Session) -> PlayerJoinGameRequest:
        def joined_user_leaving_game(old_request: PlayerJoinGameRequest,
                                     new_request: PlayerJoinGameRequest):
            if not old_request or not new_request:
                return False
            return (old_request.request_status == PlayerInGameStatus.REQUEST_TO_JOIN and
                    new_request.request_status == PlayerInGameStatus.HAS_QUIT)

        def has_left_user_joining_game(old_request: PlayerJoinGameRequest,
                                       new_request: PlayerJoinGameRequest):
            if not old_request or not new_request:
                return False
            return (old_request.request_status == PlayerInGameStatus.HAS_QUIT and
                    new_request.request_status == PlayerInGameStatus.REQUEST_TO_JOIN)

        game = self.get_game(game_uuid)
        game_join_request = (session.query(PlayerJoinGameRequest)
                             .filter(PlayerJoinGameRequest.game_uuid
                                     == game_uuid)
                             .filter(PlayerJoinGameRequest.user_uuid
                                     == updated_request.user_uuid)
                             .scalar()
                             )

        if (game.gm_user_uuid != user.uuid):
            if not (joined_user_leaving_game(game_join_request, updated_request) or
                    has_left_user_joining_game(game_join_request, updated_request)):
                raise KeyError("Game does not exist or user does not have "
                               "the permission to access it.")  # ZKP

        if not game_join_request:
            raise KeyError("Player with uuid {} has no join request"
                           "".format(user.uuid))
        game_join_request.request_status = updated_request.request_status
        game_join_request.timestamp_modified = datetime.now()
        session.add(game_join_request)
        return game_join_request

    @with_session
    def register_username(self, username_str: str, session: Session) -> str:
        existing_username = (session.query(Username.user_uuid)
                             .filter(Username.username == username_str)
                             .scalar()
                             )
        if existing_username:
            raise KeyError("This username already taken")

        user_uuid = UserUuid()
        session.add(user_uuid)
        session.flush()  # Required to force defaults generation
        username = Username(username=username_str,
                            user_uuid=user_uuid.user_uuid)

        session.add(username)
        session.flush()

        return user_uuid.user_uuid

    @with_session
    def get_user_uuid_from_username(self, username_str: str, session: Session
                                    ) -> str:
        existing_username_uuid = (session.query(Username.user_uuid)
                                  .filter(Username.username == username_str)
                                  .scalar()
                                  )
        if not existing_username_uuid:
            raise KeyError("No user with such username could be found")

        return existing_username_uuid

    @with_session
    def get_username_from_user_uuid(self, user_uuid: str, session: Session
                                    ) -> str:
        existing_username = (session.query(Username)
                             .filter(Username.user_uuid == user_uuid)
                             .scalar()
                             )
        if not existing_username:
            raise KeyError("No username with such uuid could be found")

        return existing_username

    @with_session
    def register_user_auth(self, user_auth: UserAuth, session: Session
                           ) -> UserAuth:
        user_uuid = user_auth.user_uuid
        existing_user_auth = (session.query(UserAuth.public_key)
                              .filter(UserAuth.user_uuid == user_uuid)
                              .scalar()
                              )

        if existing_user_auth:
            raise KeyError("User has already registered a public key")

        user_auth.authentication_strategy = "Grenouille"
        user_auth.timestamp_registered = datetime.now()

        session.add(user_auth)
        return user_auth

    @with_session
    def get_user_auth(self, user_uuid: str, session: Session) -> UserAuth:
        existing_user_auth = (session.query(UserAuth)
                              .filter(UserAuth.user_uuid == user_uuid)
                              .scalar()
                              )

        if not existing_user_auth:
            raise KeyError("User has not registered a public key")
        return existing_user_auth

    @with_session
    def register_user(self, user: User, session: Session):
        existing_user = (session.query(User.uuid)
                         .filter(User.uuid == user.uuid)
                         .scalar()
                         )

        if existing_user:
            raise KeyError("A user profile with this UUID already exists")

        user.timestamp_created = datetime.now()
        session.add(user)
        return user

    @with_session
    def update_user(self, updated_user: User, requesting_user: User,
                    session: Session) -> User:
        existing_user = (session.query(User)
                         .filter(User.uuid == requesting_user.uuid)
                         .scalar()
                         )

        if not existing_user:
            raise KeyError("Couldn't find the requested profile")

        existing_user.fromdict(updated_user.asdict(
            exclude=['uuid', 'timestamp_created']))
        existing_user.timestamp_modified = datetime.now()
        session.add(existing_user)
        return existing_user

    @with_session
    def get_user(self, user_uuid: str, session: Session):
        existing_user = (session.query(User)
                         .filter(User.uuid == user_uuid)
                         .scalar()
                         )

        if not existing_user:
            raise KeyError("User profile does not exist")

        return existing_user

    @with_session
    def get_user_from_session_token(self, session_token: str,
                                    session: Session):
        existing_token_user_uuid = (session.query(UserSessionToken.user_uuid)
                                    .filter(UserSessionToken.session_token
                                            == session_token)
                                    .scalar()
                                    )
        if not existing_token_user_uuid:
            raise KeyError("Unknown session token")

        return self.get_user(existing_token_user_uuid)

    @with_session
    def get_user_tokens_for_user(self, user_uuid: str, session: Session
                                 ) -> Set[UserSessionToken]:
        return set(session.query(UserSessionToken)
                   .filter(UserSessionToken.user_uuid == user_uuid)
                   .all()
                   )

    @with_session
    def get_auth_challenge_for_user(self, user_uuid: str, session: Session
                                    ) -> UserAuthChallenge:
        existing_challenge = (session.query(UserAuthChallenge)
                              .filter(UserAuthChallenge.user_uuid == user_uuid)
                              .scalar()
                              )
        if not existing_challenge:
            raise ValueError('No challenge exist for the specified user uuid.')
        return existing_challenge

    @with_session
    def gen_auth_challenge_for_user(self, user_uuid: str, session: Session
                                    ) -> UserAuthChallenge:
        existing_challenge = (session.query(UserAuthChallenge)
                              .filter(UserAuthChallenge.user_uuid == user_uuid)
                              .scalar()
                              )
        if existing_challenge:
            session.delete(existing_challenge)
            session.flush()
        new_challenge = UserAuthChallenge(user_uuid=user_uuid)
        session.add(new_challenge)
        return new_challenge

    @with_session
    def session_token_from_signed_challenge(self, user_uuid: str,
                                            b64_signed: str, session: Session
                                            ) -> UserSessionToken:
        existing_challenge = self.get_auth_challenge_for_user(user_uuid)
        user_auth = self.get_user_auth(user_uuid)
        if not Crypto().verify_message_signature(
                b64_signed, existing_challenge.user_challenge, user_auth):
            raise ValueError('Invalid challenge.')
        session.delete(existing_challenge)
        created_token = UserSessionToken(user_uuid=user_uuid)
        session.add(created_token)
        return created_token