fossasia/open-event-orga-server

View on GitHub
app/models/user.py

Summary

Maintainability
C
1 day
Test Coverage
import random
from datetime import datetime

import humanize
import pytz
from citext import CIText
from coolname import generate
from flask import url_for
from flask_scrypt import generate_password_hash, generate_random_salt
from slugify import slugify
from sqlalchemy import desc, event
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from sqlalchemy.sql import func

from app.api.helpers.db import get_count
from app.api.helpers.utilities import get_serializer
from app.models import db
from app.models.base import SoftDeletionModel
from app.models.custom_system_role import UserSystemRole
from app.models.event import Event
from app.models.helpers.versioning import clean_html, clean_up_string
from app.models.notification import Notification
from app.models.panel_permission import PanelPermission
from app.models.permission import Permission
from app.models.role import Role
from app.models.service import Service
from app.models.session import Session
from app.models.speaker import Speaker
from app.models.user_permission import UserPermission
from app.models.users_events_role import UsersEventsRoles as UER

# System-wide
ADMIN = 'admin'
SUPERADMIN = 'super_admin'

MARKETER = 'Marketer'
SALES_ADMIN = 'Sales Admin'

SYS_ROLES_LIST = [
    ADMIN,
    SUPERADMIN,
]

# Event-specific
TRACK_ORGANIZER = 'track_organizer'
MODERATOR = 'moderator'
REGISTRAR = 'registrar'


class User(SoftDeletionModel):
    """User model class"""

    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    _email = db.Column(CIText, unique=True, nullable=False)
    _password = db.Column(db.String(128), nullable=False)
    facebook_id = db.Column(db.BigInteger, unique=True, nullable=True, name='facebook_id')
    facebook_login_hash = db.Column(db.String, nullable=True)
    reset_password = db.Column(db.String(128))
    salt = db.Column(db.String(128))
    avatar_url = db.Column(db.String)
    tokens = db.Column(db.Text)
    first_name = db.Column(db.String, nullable=True)
    last_name = db.Column(db.String, nullable=True)
    details = db.Column(db.String)
    contact = db.Column(db.String)
    facebook_url = db.Column(db.String)
    twitter_url = db.Column(db.String)
    instagram_url = db.Column(db.String)
    google_plus_url = db.Column(db.String)
    original_image_url = db.Column(db.String, nullable=True, default=None)
    thumbnail_image_url = db.Column(db.String)
    small_image_url = db.Column(db.String)
    icon_image_url = db.Column(db.String)
    is_super_admin = db.Column(db.Boolean, default=False)
    is_admin = db.Column(db.Boolean, default=False)
    is_sales_admin = db.Column(db.Boolean, default=False)
    is_marketer = db.Column(db.Boolean, default=False)
    is_verified = db.Column(db.Boolean, default=False)
    is_blocked = db.Column(db.Boolean, nullable=False, default=False)
    is_profile_public = db.Column(
        db.Boolean, nullable=False, default=False, server_default='False'
    )
    public_name = db.Column(db.String)
    was_registered_with_order = db.Column(db.Boolean, default=False)
    last_accessed_at = db.Column(db.DateTime(timezone=True))
    created_at = db.Column(db.DateTime(timezone=True), default=func.now())
    # Event Invoice Details
    billing_contact_name = db.Column(db.String)
    billing_phone = db.Column(db.String)
    billing_state = db.Column(db.String)
    billing_country = db.Column(db.String)
    billing_tax_info = db.Column(db.String)
    company = db.Column(db.String)
    billing_address = db.Column(db.String)
    billing_city = db.Column(db.String)
    language_prefrence = db.Column(db.String)
    billing_zip_code = db.Column(db.String)
    billing_additional_info = db.Column(db.String)

    rocket_chat_token = db.Column(db.String)

    # relationships
    speaker = db.relationship('Speaker', backref="user")
    favourite_events = db.relationship('UserFavouriteEvent', backref="user")
    session = db.relationship('Session', backref="user")
    feedback = db.relationship('Feedback', backref="user")
    access_codes = db.relationship('AccessCode', backref="user")
    discount_codes = db.relationship('DiscountCode', backref="user")
    marketer_events = db.relationship(
        'Event',
        viewonly=True,
        secondary='join(UserSystemRole, CustomSysRole,'
        ' and_(CustomSysRole.id == UserSystemRole.role_id, CustomSysRole.name == "Marketer"))',
        primaryjoin='UserSystemRole.user_id == User.id',
        secondaryjoin='Event.id == UserSystemRole.event_id',
    )
    sales_admin_events = db.relationship(
        'Event',
        viewonly=True,
        secondary='join(UserSystemRole, CustomSysRole,'
        ' and_(CustomSysRole.id == UserSystemRole.role_id, CustomSysRole.name == "Sales Admin"))',
        primaryjoin='UserSystemRole.user_id == User.id',
        secondaryjoin='Event.id == UserSystemRole.event_id',
    )

    @hybrid_property
    def password(self):
        """
        Hybrid property for password
        :return:
        """
        return self._password

    @password.setter
    def password(self, password):
        """
        Setter for _password, saves hashed password, salt and reset_password string
        :param password:
        :return:
        """
        salt = str(generate_random_salt(), 'utf-8')
        self._password = str(generate_password_hash(password, salt), 'utf-8')
        hash_ = random.getrandbits(128)
        self.reset_password = str(hash_)
        self.salt = salt

    @hybrid_property
    def email(self):
        """
        Hybrid property for email
        :return:
        """
        return self._email

    @email.setter
    def email(self, email):
        """
        Setter for _email,
        set user to 'not verified' if email is updated
        :param email:
        :return:
        """
        if self._email != email:
            self._email = email
            self.is_verified = False

    # User Permissions
    def can_publish_event(self):
        """
        Checks if User can publish an event
        """
        perm = UserPermission.query.filter_by(name='publish_event').first()
        if not perm:
            return self.is_verified

        if self.is_verified is False:
            return perm.unverified_user

        return True

    def can_create_event(self):
        """
        Checks if User can create an event
        """
        perm = UserPermission.query.filter_by(name='create_event').first()
        if not perm:
            return self.is_verified

        if self.is_verified is False:
            return perm.unverified_user

        return True

    def _is_role(self, role_name, event_id=None):
        """
        Checks if a user has a particular Role at an Event.
        """
        from app.models.users_groups_role import UsersGroupsRoles

        role = Role.query.filter_by(name=role_name).first()
        uer = UER.query.filter_by(user=self, role=role)
        ugr = UsersGroupsRoles.query.filter_by(user=self, role=role, accepted=True)
        if event_id:
            uer = uer.filter_by(event_id=event_id)
            event = Event.query.get(event_id)
            # Validation to ensure event is not None
            if event is not None and event.group is not None:
                ugr = ugr.filter_by(group=event.group)
        return bool(uer.first() or ugr.first())

    def is_owner(self, event_id):
        return self._is_role(Role.OWNER, event_id)

    def is_organizer(self, event_id):
        return self._is_role(Role.ORGANIZER, event_id)

    def is_coorganizer(self, event_id):
        return self._is_role(Role.COORGANIZER, event_id)

    def is_track_organizer(self, event_id):
        return self._is_role(TRACK_ORGANIZER, event_id)

    def is_moderator(self, event_id):
        return self._is_role(MODERATOR, event_id)

    def is_registrar(self, event_id):
        return self._is_role(REGISTRAR, event_id)

    def has_event_access(self, event_id):
        return (
            self._is_role(Role.OWNER, event_id)
            or self._is_role(Role.ORGANIZER, event_id)
            or self._is_role(Role.COORGANIZER, event_id)
        )

    @hybrid_property
    def is_user_owner(self):
        return self._is_role(Role.OWNER)

    @hybrid_property
    def is_user_organizer(self):
        return self._is_role(Role.ORGANIZER)

    @hybrid_property
    def is_user_coorganizer(self):
        return self._is_role(Role.COORGANIZER)

    @hybrid_property
    def is_user_track_organizer(self):
        return self._is_role(TRACK_ORGANIZER)

    @hybrid_property
    def is_user_moderator(self):
        return self._is_role(MODERATOR)

    @hybrid_property
    def is_user_registrar(self):
        return self._is_role(REGISTRAR)

    def _has_perm(self, operation, service_class, event_id):
        # Operation names and their corresponding permission in `Permissions`
        operations = {
            'create': 'can_create',
            'read': 'can_read',
            'update': 'can_update',
            'delete': 'can_delete',
        }
        if operation not in list(operations.keys()):
            raise ValueError('No such operation defined')

        try:
            service_name = service_class.get_service_name()
        except AttributeError:
            # If `service_class` does not have `get_service_name()`
            return False

        if self.is_super_admin:
            return True

        service = Service.query.filter_by(name=service_name).first()

        uer_querylist = UER.query.filter_by(user=self, event_id=event_id)
        for uer in uer_querylist:
            role = uer.role
            perm = Permission.query.filter_by(role=role, service=service).first()
            if getattr(perm, operations[operation]):
                return True

        return False

    def can_create(self, service_class, event_id):
        return self._has_perm('create', service_class, event_id)

    def can_read(self, service_class, event_id):
        return self._has_perm('read', service_class, event_id)

    def can_update(self, service_class, event_id):
        return self._has_perm('update', service_class, event_id)

    def can_delete(self, service_class, event_id):
        return self._has_perm('delete', service_class, event_id)

    def is_speaker_at_session(self, session_id):
        try:
            session = (
                Session.query.filter(Session.speakers.any(Speaker.user_id == self.id))
                .filter(Session.id == session_id)
                .one()
            )
            return bool(session)
        except MultipleResultsFound:
            return False
        except NoResultFound:
            return False

    def is_speaker_at_event(self, event_id):
        try:
            session = (
                Session.query.filter(Session.speakers.any(Speaker.user_id == self.id))
                .filter(Session.event_id == event_id)
                .first()
            )
            return bool(session)
        except MultipleResultsFound:
            return False
        except NoResultFound:
            return False

    # Flask-Login integration
    def is_authenticated(self):
        return True

    def is_active(self):
        return True

    def is_anonymous(self):
        return False

    def get_id(self):
        return self.id

    def is_correct_password(self, password):
        salt = self.salt
        password = str(generate_password_hash(password, salt), 'utf-8')
        if password == self._password:
            return True
        return False

    @property
    def is_staff(self):
        return self.is_super_admin or self.is_admin

    def is_sys_role(self, role_id):
        """
        Check if a user has a Custom System Role assigned.
        `role_id` is id of a `CustomSysRole` instance.
        """
        role = UserSystemRole.query.filter_by(user=self, role_id=role_id).first()
        return bool(role)

    def first_access_panel(self):
        """
        Check if the user is assigned a Custom Role or not
        This checks if there is an entry containing the current user in the `user_system_roles` table
        returns panel name if exists otherwise false
        """
        custom_role = UserSystemRole.query.filter_by(user=self).first()
        if not custom_role:
            return False
        perm = PanelPermission.query.filter(
            PanelPermission.custom_system_roles.any(id=custom_role.role_id)
        ).first()
        if not perm:
            return False
        return perm.panel_name

    def can_access_panel(self, panel_name):
        """
        Check if user can access an Admin Panel
        """
        if self.is_staff:
            return True

        custom_sys_roles = UserSystemRole.query.filter_by(user=self)
        for custom_role in custom_sys_roles:
            if custom_role.role.can_access(panel_name):
                return True

        return False

    def get_unread_notif_count(self):
        return get_count(Notification.query.filter_by(user=self, is_read=False))

    def get_unread_notifs(self):
        """
        Get unread notifications with titles, humanized receiving time
        and Mark-as-read links.
        """
        notifs = []
        unread_notifs = Notification.query.filter_by(user=self, is_read=False).order_by(
            desc(Notification.received_at)
        )
        for notif in unread_notifs:
            notifs.append(
                {
                    'title': notif.title,
                    'received_at': humanize.naturaltime(
                        datetime.now(pytz.utc) - notif.received_at
                    ),
                    'mark_read': url_for(
                        'notifications.mark_as_read', notification_id=notif.id
                    ),
                }
            )

        return notifs

    # update last access time
    def update_lat(self):
        self.last_accessed_at = datetime.now()

    # Deprecated
    @property
    def fullname(self):
        return self.full_name

    @property
    def full_name(self):
        return ' '.join(filter(None, [self.first_name, self.last_name]))

    def get_full_billing_address(self, sep: str = '\n') -> str:
        return sep.join(
            filter(
                None,
                [
                    self.billing_address,
                    self.billing_city,
                    self.billing_state,
                    self.billing_zip_code,
                    self.billing_country,
                ],
            )
        )

    full_billing_address = property(get_full_billing_address)

    @property
    def anonymous_name(self):
        return ' '.join(map(lambda x: x.capitalize(), generate(2)))

    @property
    def rocket_chat_username(self):
        name = self.public_name or self.full_name or f'user_{self.id}'
        return slugify(name, word_boundary=True, max_length=32, separator='.')

    @property
    def rocket_chat_password(self):
        return get_serializer().dumps(f'rocket_chat_user_{self.id}')

    @property
    def is_rocket_chat_registered(self) -> bool:
        return self.rocket_chat_token is not None

    def __repr__(self):
        return '<User %r>' % self.email

    def __setattr__(self, name, value):
        if name == 'details':
            super().__setattr__(name, clean_html(clean_up_string(value)))
        else:
            super().__setattr__(name, value)


@event.listens_for(User, 'init')
def receive_init(target, args, kwargs):
    target.signup_at = datetime.now(pytz.utc)