fossasia/open-event-orga-server

View on GitHub
app/api/helpers/export_helpers.py

Summary

Maintainability
D
1 day
Test Coverage
import json
import os
import shutil
from collections import OrderedDict
from datetime import datetime

import pytz
import requests
from flask import current_app as app
from flask import request, url_for
from flask_jwt_extended import current_user
from flask_login import current_user as current_logged_user

from app.api.helpers.db import save_to_db
from app.api.helpers.storage import UPLOAD_PATHS, UploadedFile, upload
from app.api.helpers.utilities import get_filename_from_cd, is_downloadable
from app.models import db
from app.models.custom_form import CustomForms
from app.models.event import Event
from app.models.export_job import ExportJob
from app.models.microlocation import Microlocation
from app.models.session import Session
from app.models.session_type import SessionType
from app.models.speaker import Speaker
from app.models.sponsor import Sponsor
from app.models.track import Track

# order of keys in export json
FIELD_ORDER = {
    'event': [
        'id',
        'name',
        'latitude',
        'longitude',
        'location_name',
        'starts_at',
        'ends_at',
        'timezone',
        'description',
        'original_image_url',
        'logo_url',
        'owner_name',
        'owner_description',
        'external_event_url',
        'ticket_url',
        'privacy',
        'event_type_id',
        'event_topic_id',
        'event_sub_topic_id',
        'code_of_conduct',
    ],
    'microlocations': ['id', 'name', 'floor'],
    'sessions': [
        'id',
        'title',
        'subtitle',
        'short_abstract',
        'long_abstract',
        'starts_at',
        'ends_at',
        'session_type_id',
        'track_id',
        'comments',
        'language',
        'slides_url',
        'audio_url',
        'video_url',
    ],
    'speakers': [
        'id',
        'name',
        'email',
        'mobile',
        'photo_url',
        'organisation',
        'position',
        'country',
        'short_biography',
        'long_biography',
        'website',
        'twitter',
        'facebook',
        'github',
        'linkedin',
    ],
    'sponsors': ['id', 'name', 'logo_url', 'level', 'type', 'url', 'description'],
    'tracks': ['id', 'name', 'color', 'font_color'],
    'session_types': ['id', 'name', 'length'],
    'forms': [],
}

# keep sync with storage.UPLOAD_PATHS
DOWNLOAD_FIEDLS = {
    'sessions': {
        'video_url': ['video', '/videos/session_%d'],
        'audio_url': ['audio', '/audios/session_%d'],
        'slides_url': ['document', '/slides/session_%d'],
    },
    'speakers': {'photo_url': ['image', '/images/speakers/%s_%d']},
    'event': {
        'logo_url': ['image', '/images/logo'],
        'external_event_url': ['image', '/images/background'],
    },
    'sponsors': {'logo_url': ['image', '/images/sponsors/%s_%d']},
}

DATE_FIELDS = ['starts_at', 'ends_at', 'created_at', 'deleted_at', 'submitted_at']

EXPORTS = [
    ('event', Event),
    ('microlocations', Microlocation),
    ('sessions', Session),
    ('speakers', Speaker),
    ('sponsors', Sponsor),
    ('tracks', Track),
    ('session_types', SessionType),
    ('forms', CustomForms),
]

# strings to remove in a filename
FILENAME_EXCLUDE = r'<>:"/\|?*;'


# FUNCTIONS


def sorted_dict(data):
    """
    sorts a json (dict/list->dict) and returns OrderedDict
    """
    if type(data) is OrderedDict:
        data = dict(data)
    if type(data) is dict:
        data = OrderedDict(sorted(list(data.items()), key=lambda t: t[0]))
    elif type(data) is list:
        for count in range(len(data)):
            data[count] = OrderedDict(
                sorted(list(data[count].items()), key=lambda t: t[0])
            )
    return data


def _order_json(data, srv):
    """
    sorts the data a/c FIELD_ORDER and returns.
    If some keys are not included in FIELD_ORDER, they go at last, sorted alphabetically
    """
    new_data = OrderedDict()
    data.pop('_sa_instance_state', None)
    for field in FIELD_ORDER[srv[0]]:
        if field in DATE_FIELDS and data[field] and type(data[field]) != str:
            new_data[field] = sorted_dict(data[field].isoformat())
        elif field == 'font_color' and 'id' in new_data:
            track = db.session.query(Track).filter(Track.id == new_data['id']).first()
            new_data[field] = track.font_color
        else:
            new_data[field] = sorted_dict(data[field])
        data.pop(field, None)

    # remaining fields, sort and add
    # https://docs.python.org/2/library/collections.html#collections.OrderedDict
    data = OrderedDict(sorted(list(data.items()), key=lambda t: t[0]))
    for key in data:
        if key in DATE_FIELDS and data[key] and type(data[key]) != str:
            new_data[key] = sorted_dict(data[key].isoformat())
        else:
            new_data[key] = sorted_dict(data[key])

    return new_data


def _download_media(data, srv, dir_path, settings):
    """
    Downloads the media and saves it
    """
    if srv not in DOWNLOAD_FIEDLS:
        return
    for i in DOWNLOAD_FIEDLS[srv]:
        if not data[i]:
            continue
        if not settings[DOWNLOAD_FIEDLS[srv][i][0]]:
            continue
        path = DOWNLOAD_FIEDLS[srv][i][1]
        if srv == 'speakers':
            path %= make_filename(data['name']), data['id']
        elif srv == 'sponsors':
            path %= make_filename(data['name']), data['id']
        elif srv != 'event':
            path = path % (data['id'])
        if data[i].find('.') > -1:  # add extension
            ext = data[i].rsplit('.', 1)[1]
            if ext.find('/') == -1:
                path += '.' + ext
        full_path = dir_path + path
        # make dir
        cdir = full_path.rsplit('/', 1)[0]
        if not os.path.isdir(cdir):
            os.makedirs(cdir)
        # download and set
        url = data[i]
        if not is_downloadable(url):
            continue
        try:
            r = requests.get(url, allow_redirects=True)
            ext = get_filename_from_cd(r.headers.get('content-disposition'))[1]
            full_path += ext
            path += ext
            open(full_path, 'wb').write(r.content)
            data[i] = path
        except Exception:
            pass


def _generate_meta():
    """
    Generate Meta information for export
    """
    d = {'root_url': request.url_root}
    return d


def export_event_json(event_id, settings):
    """
    Exports the event as a zip on the server and return its path
    """
    # make directory
    exports_dir = app.config['BASE_DIR'] + '/static/uploads/exports/'
    if not os.path.isdir(exports_dir):
        os.makedirs(exports_dir)
    dir_path = exports_dir + 'event%d' % int(event_id)
    if os.path.isdir(dir_path):
        shutil.rmtree(dir_path, ignore_errors=True)
    os.makedirs(dir_path)
    # save to directory
    for e in EXPORTS:
        if e[0] == 'event':
            query_obj = db.session.query(e[1]).filter(e[1].id == event_id).first()
            data = _order_json(dict(query_obj.__dict__), e)
            _download_media(data, 'event', dir_path, settings)
        else:
            query_objs = db.session.query(e[1]).filter(e[1].event_id == event_id).all()
            data = [_order_json(dict(query_obj.__dict__), e) for query_obj in query_objs]
            for count in range(len(data)):
                data[count] = _order_json(data[count], e)
                _download_media(data[count], e[0], dir_path, settings)
        data_str = json.dumps(
            data, indent=4, ensure_ascii=False, default=handle_unserializable_data
        ).encode('utf-8')
        fp = open(dir_path + '/' + e[0], 'w')
        fp.write(str(data_str, 'utf-8'))
        fp.close()
    # add meta
    data_str = json.dumps(
        _generate_meta(), sort_keys=True, indent=4, ensure_ascii=False
    ).encode('utf-8')
    fp = open(dir_path + '/meta', 'w')
    fp.write(str(data_str, 'utf-8'))
    fp.close()
    # make zip
    shutil.make_archive(dir_path, 'zip', dir_path)
    dir_path = dir_path + ".zip"

    storage_path = UPLOAD_PATHS['exports']['zip'].format(event_id=event_id)
    uploaded_file = UploadedFile(dir_path, dir_path.rsplit('/', 1)[1])
    storage_url = upload(uploaded_file, storage_path)

    return storage_url


def get_current_user():
    if current_user:
        return current_user
    return current_logged_user


# HELPERS


def create_export_job(task_id, event_id):
    """
    Create export job for an export that is going to start
    """
    export_job = ExportJob.query.filter_by(event_id=event_id).first()
    task_url = url_for('tasks.celery_task', task_id=task_id)
    current_logged_user = get_current_user()

    if export_job:

        export_job.task = task_url
        export_job.user_email = current_logged_user.email
        export_job.event = Event.query.get(event_id)
        export_job.starts_at = datetime.now(pytz.utc)
    else:
        export_job = ExportJob(
            task=task_url,
            user_email=current_logged_user.email,
            event=Event.query.get(event_id),
        )
    save_to_db(export_job, 'ExportJob saved')


# FIELD DATA FORMATTERS
def make_filename(name):
    """Make speaker image filename for export"""
    for _ in FILENAME_EXCLUDE:
        name = name.replace(_, ' ')
    return ''.join(s.title() for s in name.split() if s)


def handle_unserializable_data(obj):
    """
    Handles objects which cannot be serialized by json.dumps()
    :param obj: Object to be serialized
    :return: JSON representation of the object
    """
    if isinstance(obj, datetime):
        return obj.__str__()


def create_export_badge_job(task_id, event_id, attendee_id):
    """Create export job for an export that is going to start"""
    export_job = ExportJob.query.filter_by(
        event_id=event_id, attendee_id=attendee_id
    ).first()
    task_url = url_for('tasks.celery_task', task_id=task_id)
    logged_user = get_current_user()

    if export_job:

        export_job.task = task_url
        export_job.user_email = logged_user.email
        export_job.attendee_id = attendee_id
        export_job.event = Event.query.get(event_id)
        export_job.starts_at = datetime.now(pytz.utc)
    else:
        export_job = ExportJob(
            task=task_url,
            user_email=logged_user.email,
            attendee_id=attendee_id,
            event=Event.query.get(event_id),
        )
    save_to_db(export_job, 'ExportJob saved')


def comma_separated_params_to_list(param):
    """
    convert string to list separated by comma
    @param param: string to be separates
    @return: array string
    """
    return list(filter(lambda x: x and x is not None, param.split(',')))