fossasia/open-event-orga-server

View on GitHub
app/api/helpers/user_check_in.py

Summary

Maintainability
A
3 hrs
Test Coverage
import csv
import os
import uuid
from datetime import datetime, timedelta

from flask import current_app

from app.api.helpers.errors import BadRequestError, UnprocessableEntityError
from app.api.helpers.static import STATION_TYPE
from app.api.helpers.storage import UPLOAD_PATHS, UploadedFile, generate_hash, upload
from app.models.session import Session
from app.models.station import Station
from app.models.user_check_in import UserCheckIn


def export_csv(data):
    if data is None:
        raise BadRequestError({'source': data}, 'Bad Request Error')

    query_ = UserCheckIn.query.join(Station)
    if 'session_id' in data and data['session_id']:
        query_ = query_.filter(UserCheckIn.session_id == data['session_id'])
    if 'date' in data and data['date']:
        dt = datetime.strptime(data['date'], '%Y-%m-%d')
        start = dt - timedelta(0)
        end = start + timedelta(days=1)
        query_ = query_.filter(UserCheckIn.created_at >= start).filter(
            UserCheckIn.created_at < end
        )
    if 'track_type' in data and data['track_type']:
        query_ = query_.filter(UserCheckIn.track_name == data['track_type'])
    if 'type' in data and data['type']:
        if '&' in data['type']:
            query_ = query_.filter(
                Station.station_type.in_(x.strip() for x in data['type'].split('&'))
            )
        else:
            query_ = query_.filter(Station.station_type == data['type'])
    userCheckIns = query_.order_by(UserCheckIn.created_at.desc()).all()

    try:
        filedir = os.path.join(current_app.config.get('BASE_DIR'), 'static/uploads/temp/')
        if not os.path.isdir(filedir):
            os.makedirs(filedir)

        identifier = uuid.uuid1().hex
        filename = f"user-check-in-{identifier}.csv"
        file_path = os.path.join(filedir, filename)

        with open(file_path, "w") as temp_file:
            writer = csv.writer(temp_file)
            content = create_file_csv(userCheckIns)
            for row in content:
                writer.writerow(row)
        csv_file = UploadedFile(file_path=file_path, filename=filename)
        uploadPath = UPLOAD_PATHS['exports-temp']['csv'].format(
            event_id='admin', identifier=identifier
        )
        upload(csv_file, uploadPath)
        os.remove(filedir + filename)
        return f'static/media/{uploadPath}/{generate_hash(uploadPath)}/{filename}'
    except Exception as e:
        raise BadRequestError({'source': e}, 'Bad Request Error')


def create_file_csv(userCheckIns):
    headers = [
        'Ticket Id',
        'Date Time',
        'Track Name',
        'Session Name',
        'Speaker Name',
        'Type',
    ]

    columns = [
        'ticket_holder_id',
        'created_at',
        'track_name',
        'session_name',
        'speaker_name',
        'type',
    ]
    rows = [headers]
    for userCheckIn in userCheckIns:
        data = []
        for column in columns:
            if column == 'type':
                data.append(userCheckIn.station.station_type)
                continue
            if column == 'created_at':
                data.append(userCheckIn.created_at.strftime('%Y-%m-%d %H:%M:%S'))
                continue
            data.append(getattr(userCheckIn, column))
        rows.append(data)

    return rows


def validate_microlocation(station: Station, session: Session):
    """
    validate if microlocation matches
    @param station:
    @param session:
    """
    if station.microlocation_id != session.microlocation_id:
        raise UnprocessableEntityError(
            {
                'station microlocation': station.microlocation_id,
                'session microlocation': session.microlocation_id,
            },
            "Location of your session not matches with station location"
            ", please check with the organizer.",
        )
    if station.event_id != session.event_id:
        raise UnprocessableEntityError(
            {
                'station event_id': station.event_id,
                'session event_id': session.event_id,
            },
            "Session not belong to this event.",
        )


def validate_check_in_out_status(station: Station, attendee_data: UserCheckIn):
    """
    validate if attendee already check in/out
    @param station:
    @param attendee_data:
    """
    if attendee_data:
        if (
            attendee_data.station.station_type == station.station_type
            and station.station_type == STATION_TYPE.get('check in')
        ):
            raise UnprocessableEntityError(
                {
                    'attendee': attendee_data.ticket_holder_id,
                    'session ': attendee_data.session_id,
                },
                "Attendee already checked in.",
            )
        if (
            attendee_data.station.station_type == station.station_type
            and station.station_type == STATION_TYPE.get('check out')
        ):
            raise UnprocessableEntityError(
                {
                    'attendee': attendee_data.ticket_holder_id,
                    'session ': attendee_data.session_id,
                },
                "Attendee not check in yet.",
            )
    else:
        if station.station_type == STATION_TYPE.get('check out'):
            raise UnprocessableEntityError(
                {
                    'attendee': attendee_data.ticket_holder_id,
                    'session ': attendee_data.session_id,
                },
                "Attendee not check in yet.",
            )