oplik0/cherrydoor

View on GitHub
cherrydoor/api/manage_users_endpoint.py

Summary

Maintainability
A
0 mins
Test Coverage
import datetime as dt
import json
from typing import List

from aiojobs.aiohttp import atomic
from aiohttp.web import HTTPBadRequest, HTTPNotFound, HTTPConflict, Request
from aiohttp.web_response import Response
from aiohttp_rest_api import AioHTTPRestEndpoint
from aiohttp_rest_api.responses import respond_with_json

from cherrydoor.auth import check_api_permissions, check_if_self, register_users
from cherrydoor.database import (
    find_user_by_username,
    find_user_by_uid,
    modify_user,
    user_exists,
    create_user,
    delete_user,
)
from cherrydoor.util import get_datetime


class ManageUsersEndpoint(AioHTTPRestEndpoint):
    def connected_routes(self) -> List[str]:
        """"""
        return ["/users"]

    @atomic
    async def put(self, request: Request) -> Response:
        """
        ---
        summary: Create multiple new users
        description:
        security:
            - Bearer Authentication: [users_manage]
            - X-API-Key Authentication: [users_manage]
            - Session Authentication: [users_manage]
        tags:
            - users
        requestBody:
            description: JSON containing a list of new user objects
            required: true
            content:
                application/json:
                    schema:
                        type: object
                        properties:
                            users:
                                type: array
                                items:
                                    type: object
                                    properties:
                                        username:
                                            type: string
                                            description: username of the user you want to modify
                                            required: true
                                        password:
                                            type: string
                                            description: unhashed password that will be hashed before being added to the user
                                        cards:
                                            description: a single card or list of cards that will be assigned to the user
                                            oneOf:
                                                - type: array
                                                  description: list of all cards that will be assigned to the user
                                                  items:
                                                    type: string
                                                    format: mifare uid
                                                    pattern: '^([0-9a-fA-F]{8}|([0-9a-fA-F]{14}|[0-9a-fA-F]{20})$'
                                                - type: string
                                                  description: a single card that will be assigned to the user
                                                  format: mifare uid
                                                  pattern: '^([0-9a-fA-F]{8}|([0-9a-fA-F]{14}|[0-9a-fA-F]{20})$'
                                        permissions:
                                            description: a single permission or list of permissions that the user will have (you need to have a permission to assign it to others)
                                            oneOf:
                                                - type: array
                                                  description: list of all permissions that the user will have
                                                  items:
                                                    type: string
                                                    enum:
                                                        - admin
                                                        - enter
                                                        - logs
                                                        - users_read
                                                        - users_manage
                                                        - cards
                                                - type: string
                                                  description: a single permission you want to give the user
                                                  enum:
                                                    - admin
                                                    - enter
                                                    - logs
                                                    - users_read
                                                    - users_manage
                                                    - cards
                        example:
                            users:
                                - username: example
                                  password: hunter2
                                  cards:
                                    - ABABABAB
                                    - 12345678
                                  permissions: enter
                                - username: example2
                                  password: hunter4
                                  cards: FFFFFFFF
                                  permissions:
                                    - enter
                                    - cards
                                    - users_read
        responses:
            "200":
                description: A JSON document indicating success along with some information about the user
                content:
                    application/json:
                        schema:
                            $ref: '#/components/schemas/User'

            "401":
                description: A JSON document indicating error in request (user not authenticated)
                content:
                    application/json:
                        schema:
                            $ref: '#/components/schemas/Error'
            "403":
                description: A JSON document indicating error in request (user doesn't have permission to preform this action)
                content:
                    application/json:
                        schema:
                            $ref: '#/components/schemas/Error'
            "409":
                description: A JSON document indicating error in request (user already exists)
                content:
                    application/json:
                        schema:
                            $ref: '#/components/schemas/Error'
        """

        data = await request.json()
        if isinstance(data.get("users", None), list):
            users = data["users"]
        elif isinstance(data.get("users", ""), str):
            users = [data.get("users", "")]
        else:
            raise HTTPBadRequest(
                reason="no users provided",
                body=json.dumps(
                    {"Ok": False, "Error": "no users provided", "status_code": 401}
                ),
                content_type="application/json",
            )
        permissions = set()
        for user in users:
            user_permissions = user.get("permissions", [])
            if not isinstance(user_permissions, list):
                user_permission = [user_permissions]
            permissions.update(set(user_permissions))
        await check_api_permissions(request, ["users_manage", *permissions])
        result_users = []
        for user_data in users:
            username = user_data.get("username", False)
            if not username:
                raise HTTPBadRequest(
                    reason="no username provided for at least one user",
                    body=json.dumps(
                        {
                            "Ok": False,
                            "Error": "no username provided for at least one user",
                            "status_code": 401,
                        }
                    ),
                    content_type="application/json",
                )
            user = await user_exists(request.app, username)
            if user:
                raise HTTPConflict(
                    reason=f"User with this username ({username}) already exists",
                    body=json.dumps(
                        {
                            "Ok": False,
                            "Error": f"User with this username ({username}) already exists",
                            "status_code": 409,
                        }
                    ),
                    content_type="application/json",
                )
            cards = user_data.get("cards", [])
            if not isinstance(cards, list):
                user_data["cards"] = [cards]
        await register_users(request.app, users)
        users = [
            {
                "username": user.get("username"),
                "cards": user.get("cards", []),
                "permissions": user.get("permissions", []),
            }
            for user in users
        ]
        return respond_with_json(
            {"Ok": True, "Error": None, "status_code": 200, "users": users}
        )