fossasia/open-event-orga-server

View on GitHub
app/api/helpers/mail.py

Summary

Maintainability
D
2 days
Test Coverage
import base64
import datetime
import logging
import os
from itertools import groupby
from typing import Dict, Optional

import pytz
from babel.dates import format_date, format_datetime, format_time
from babel.numbers import format_currency
from flask import current_app, render_template
from sqlalchemy.orm import joinedload

from app.api.helpers.db import save_to_db
from app.api.helpers.files import generate_ics_file, make_frontend_url
from app.api.helpers.log import record_activity
from app.api.helpers.system_mails import MAILS, MailType
from app.api.helpers.utilities import get_serializer, str_generator, string_empty
from app.models.event import Event
from app.models.mail import Mail
from app.models.message_setting import MessageSettings
from app.models.order import OrderTicket
from app.models.ticket_holder import TicketHolder
from app.models.user import User
from app.settings import get_settings

logger = logging.getLogger(__name__)
# pytype: disable=attribute-error


def check_smtp_config(config):
    """
    Checks config of SMTP
    """
    for field in config:
        if field is None:
            return False
    return True


def send_email(to, action, subject, html, attachments=None, bcc=None, reply_to=None):
    """
    Sends email and records it in DB
    """
    from .tasks import get_smtp_config, send_email_task_sendgrid, send_email_task_smtp

    if not MessageSettings.is_enabled(action):
        logger.info("Mail of type %s is not enabled. Hence, skipping...", action)
        return

    if isinstance(to, User):
        logger.warning('to argument should be an email string, not a User object')
        to = to.email

    if string_empty(to):
        logger.warning('Recipient cannot be empty')
        return False
    email_service = get_settings()['email_service']
    email_from_name = get_settings()['email_from_name']
    if email_service == 'smtp':
        email_from = email_from_name + '<' + get_settings()['email_from'] + '>'
    else:
        email_from = get_settings()['email_from']
    payload = {
        'to': to,
        'from': email_from,
        'subject': subject,
        'html': html,
        'attachments': attachments,
        'bcc': bcc,
        'reply_to': reply_to,
    }

    if not (current_app.config['TESTING'] or email_service == 'disable'):
        if email_service == 'smtp':
            smtp_status = check_smtp_config(get_smtp_config())
            if smtp_status:
                send_email_task_smtp.delay(payload)
            else:
                logger.error('SMTP is not configured properly. Cannot send email.')
        elif email_service == 'sendgrid':
            key = get_settings().get('sendgrid_key')
            if key:
                payload['fromname'] = email_from_name
                send_email_task_sendgrid.delay(payload)
            else:
                logger.error('SMTP & sendgrid have not been configured properly')
        else:
            logger.error(
                'Invalid Email Service Setting: %s. Skipping email', email_service
            )
    else:
        logger.warning('Email Service is disabled in settings, so skipping email')

    mail_recorder = current_app.config['MAIL_RECORDER']
    mail_recorder.record(payload)

    mail = Mail(
        recipient=to,
        action=action,
        subject=subject,
        message=html,
    )

    save_to_db(mail, 'Mail Recorded')
    record_activity('mail_event', email=to, action=action, subject=subject)

    return True


def send_email_with_action(user, action, template_name, bcc=None, **kwargs):
    """
    A general email helper to use in the APIs
    :param user: email or user to which email is to be sent
    :param action:
    :param kwargs:
    :return:
    """
    if not MessageSettings.is_enabled(action):
        logger.info("Mail of type %s is not enabled. Hence, skipping...", action)
        return

    if isinstance(user, User):
        user = user.email

    template_path = 'email/' + template_name.lower() + '.html'

    send_email(
        to=user,
        action=action,
        subject=MAILS[action]['subject'].format(**kwargs),
        html=render_template(template_path, **kwargs),
        bcc=bcc,
    )


def send_email_confirmation(email, link):
    """account confirmation"""
    action = MailType.USER_CONFIRM
    app_name = get_settings()['app_name']
    mail = MAILS[action]
    send_email(
        to=email,
        action=action,
        subject=mail['subject'].format(app_name=app_name),
        html=render_template(mail['template'], email=email, link=link),
    )


def send_email_new_session(email, session, speakers):
    """email for new session"""
    app_name = get_settings()['app_name']
    front_page = get_settings()['frontend_url']
    session_overview_link = session.event.organizer_site_link + "/sessions/pending"
    action = MailType.NEW_SESSION
    mail = MAILS[action]
    send_email(
        to=email,
        action=action,
        subject=mail['subject'].format(session=session),
        html=render_template(
            mail['template'],
            session=session,
            speakers=speakers,
            session_overview_link=session_overview_link,
            app_name=app_name,
            front_page=front_page,
        ),
    )


def send_email_ticket_sales_end(event, emails):
    """email for ticket sales end"""
    action = MailType.TICKET_SALES_END
    mail = MAILS[action]
    settings = get_settings()
    tickets = []
    for ticket in event.tickets:
        if ticket.sales_ends_at.date() == (
            datetime.date.today() - datetime.timedelta(days=1)
        ):
            tickets.append(ticket.name)

    ticket_names = ", ".join(tickets)

    event_dashboard = settings['frontend_url'] + '/events/' + event.identifier
    if len(emails) > 0:
        send_email(
            to=emails[0],
            action=action,
            subject=mail['subject'].format(event_name=event.name),
            html=render_template(
                mail['template'],
                settings=settings,
                event_dashboard=event_dashboard,
                event_name=event.name,
                ticket_names=ticket_names,
            ),
            bcc=emails[1:],
            reply_to=emails[-1],
        )


def send_email_ticket_sales_end_tomorrow(event, emails):
    """email for ticket sales end"""
    action = MailType.TICKET_SALES_END_TOMORROW
    mail = MAILS[action]
    settings = get_settings()
    tickets = []
    for ticket in event.tickets:
        if ticket.sales_ends_at.date() == (
            datetime.date.today() - datetime.timedelta(days=-1)
        ):
            tickets.append(ticket.name)

    ticket_names = ", ".join(tickets)

    event_dashboard = settings['frontend_url'] + '/events/' + event.identifier
    if len(emails) > 0:
        send_email(
            to=emails[0],
            action=action,
            subject=mail['subject'].format(event_name=event.name),
            html=render_template(
                mail['template'],
                settings=settings,
                event_dashboard=event_dashboard,
                event_name=event.name,
                ticket_names=ticket_names,
            ),
            bcc=emails[1:],
            reply_to=emails[-1],
        )


def send_email_ticket_sales_end_next_week(event, emails):
    """email for ticket sales end"""
    action = MailType.TICKET_SALES_END_NEXT_WEEK
    mail = MAILS[action]
    settings = get_settings()
    tickets = []
    for ticket in event.tickets:
        if ticket.sales_ends_at.date() == (
            datetime.date.today() - datetime.timedelta(days=-7)
        ):
            tickets.append(ticket.name)

    ticket_names = ", ".join(tickets)

    event_dashboard = settings['frontend_url'] + '/events/' + event.identifier
    if len(emails) > 0:
        send_email(
            to=emails[0],
            action=action,
            subject=mail['subject'].format(event_name=event.name),
            html=render_template(
                mail['template'],
                settings=settings,
                event_dashboard=event_dashboard,
                event_name=event.name,
                ticket_names=ticket_names,
            ),
            bcc=emails[1:],
            reply_to=emails[-1],
        )


def send_email_session_state_change(
    email, session, mail_override: Optional[Dict[str, str]] = None
):
    """email for new session"""
    event = session.event

    settings = get_settings()
    app_name = settings['app_name']
    frontend_url = settings['frontend_url']
    context = {
        'session_name': session.title,
        'session_link': session.site_link,
        'session_cfs_link': session.site_cfs_link,
        'session_state': session.state,
        'event_name': event.name,
        'event_link': event.site_link,
        'app_name': app_name,
        'frontend_link': frontend_url,
    }

    try:
        mail = MAILS[MailType.SESSION_STATE_CHANGE][session.state]
        if mail_override:
            mail = mail.copy()
            mail['subject'] = mail_override.get('subject') or mail['subject']
            mail['message'] = mail_override.get('message') or mail['message']
            mail['bcc'] = mail_override.get('bcc', [])
    except KeyError:
        logger.error('No mail found for session state change: ' + session.state)
        return

    organizers_email = list(
        map(
            lambda x: x.email,
            session.event.organizers + session.event.coorganizers + [session.event.owner],
        )
    )
    bcc = list(set(organizers_email + mail.get('bcc', [])))
    if email in bcc:
        bcc.remove(email)  # to, cc, bcc should have unique emails

    send_email(
        to=email,
        action=MailType.SESSION_STATE_CHANGE,
        subject=mail['subject'].format(**context),
        html=mail['message'].format(**context),
        bcc=bcc,
        reply_to=session.event.owner.email,
    )


def send_email_role_invite(email, role_name, event_name, link):
    """email for role invite"""
    action = MailType.EVENT_ROLE
    mail = MAILS[action]
    send_email(
        to=email,
        action=action,
        subject=mail['subject'].format(role=role_name, event=event_name),
        html=render_template(
            mail['template'],
            email=email,
            role=role_name,
            event=event_name,
            link=link,
        ),
    )


def send_email_speaker_invite(email, session, cfs_link, inviter):
    """email for speaker invite"""
    action = MailType.SPEAKER_INVITE
    app_name = get_settings()['app_name']
    mail = MAILS[action]
    send_email(
        to=email,
        action=action,
        subject=mail['subject'].format(session=session.title),
        html=render_template(
            mail['template'],
            session_title=session.title,
            event_name=session.event.name,
            event_link=session.event.site_link,
            app_name=app_name,
            frontend_url=get_settings()['frontend_url'],
            inviter_email=inviter.email,
            inviter_name=inviter.name,
            cfs_link=cfs_link,
        ),
    )


def send_email_group_role_invite(email, role_name, group_name, link):
    """email for role invite"""
    action = MailType.GROUP_ROLE
    mail = MAILS[action]
    send_email(
        to=email,
        action=action,
        subject=mail['subject'].format(role=role_name, group=group_name),
        html=render_template(
            mail['template'],
            email=email,
            role=role_name,
            group=group_name,
            link=link,
        ),
    )


def send_email_announce_event(event, group, emails):
    """email for announce event"""

    action = MailType.ANNOUNCE_EVENT
    mail = MAILS[action]

    event_name = event.name
    group_name = group.name
    event_date = event.starts_at
    event_description = event.description
    event_url = event.site_link
    event_location = event.normalized_location
    event_time = event.starts_at
    group_url = group.view_page_link
    app_name = get_settings()['app_name']

    if len(emails) > 0:
        for email in emails:
            send_email(
                to=email,
                action=action,
                subject=mail['subject'].format(
                    event_name=event_name,
                    group_name=group_name,
                    event_date=convert_to_user_locale(email, date=event_date),
                ),
                html=render_template(
                    mail['template'],
                    event_name=event_name,
                    event_description=event_description,
                    event_url=event_url,
                    event_location=event_location,
                    event_date=convert_to_user_locale(email, date=event_date),
                    event_time=convert_to_user_locale(
                        email, time=event_time, tz=event.timezone
                    ),
                    group_name=group_name,
                    group_url=group_url,
                    app_name=app_name,
                ),
            )


def send_email_for_monthly_fee_payment(
    user, event_name, previous_month, amount, app_name, link, follow_up=False
):
    """email for monthly fee payment"""
    options = {
        False: MailType.MONTHLY_PAYMENT,
        True: MailType.MONTHLY_PAYMENT_FOLLOWUP,
        'pre_due': MailType.MONTHLY_PAYMENT_PRE_DUE,
        'post_due': MailType.MONTHLY_PAYMENT_POST_DUE,
    }
    key = options[follow_up]
    mail = MAILS[key]
    email = user.email
    invoice_url = make_frontend_url('/account/billing/invoices')
    send_email(
        to=email,
        action=key,
        subject=mail['subject'].format(
            date=previous_month, event_name=event_name, app_name=app_name
        ),
        html=render_template(
            mail['template'],
            name=user.full_name,
            email=email,
            event_name=event_name,
            invoice_url=invoice_url,
            date=previous_month,
            amount=amount,
            app_name=app_name,
            payment_url=link,
        ),
        bcc=get_settings()['admin_billing_email'],
    )


def send_export_mail(email, event_name, error_text=None, download_url=None):
    """followup export link in email"""
    if error_text:
        action = MailType.EVENT_EXPORT_FAIL
        mail = MAILS[action]
        send_email(
            to=email,
            action=action,
            subject=mail['subject'].format(event_name=event_name),
            html=render_template(mail['template'], error_text=error_text),
        )
    elif download_url:
        action = MailType.EVENT_EXPORTED
        mail = MAILS[action]
        send_email(
            to=email,
            action=action,
            subject=mail['subject'].format(event_name=event_name),
            html=render_template(mail['template'], download_url=download_url),
        )


def send_import_mail(email, event_name=None, error_text=None, event_url=None):
    """followup export link in email"""
    if error_text:
        action = MailType.EVENT_IMPORT_FAIL
        mail = MAILS[action]
        send_email(
            to=email,
            action=action,
            subject=mail['subject'],
            html=render_template(mail['template'], error_text=error_text),
        )
    elif event_url:
        action = MailType.EVENT_IMPORTED
        mail = MAILS[action]
        send_email(
            to=email,
            action=action,
            subject=mail['subject'].format(event_name=event_name),
            html=render_template(mail['template'], event_url=event_url),
        )


def send_test_email(recipient):
    send_email(
        to=recipient,
        action=MailType.TEST_MAIL,
        subject=MAILS[MailType.TEST_MAIL]['subject'],
        html=MAILS[MailType.TEST_MAIL]['message'],
    )


def send_email_change_user_email(user, email):
    serializer = get_serializer()
    hash_ = str(
        base64.b64encode(bytes(serializer.dumps([email, str_generator()]), 'utf-8')),
        'utf-8',
    )
    app_name = get_settings()['app_name']
    link = make_frontend_url('/email/verify', {'token': hash_})
    send_email_with_action(
        user.email,
        MailType.USER_CONFIRM,
        'user_confirm',
        email=user.email,
        link=link,
        app_name=app_name,
    )
    send_email_with_action(
        email,
        MailType.USER_CHANGE_EMAIL,
        'user_change_email',
        email=email,
        new_email=user.email,
    )


def send_email_to_attendees(order):
    attachments = None
    if current_app.config['ATTACH_ORDER_PDF']:
        attachments = [order.ticket_pdf_path, order.invoice_pdf_path]

    event = order.event
    ical_file_path = generate_ics_file(event.id, include_sessions=False)

    if os.path.exists(ical_file_path):
        if attachments is None:
            attachments = [ical_file_path]
        else:
            attachments.append(ical_file_path)

    attendees = (
        TicketHolder.query.options(
            joinedload(TicketHolder.ticket), joinedload(TicketHolder.user)
        )
        .filter_by(order_id=order.id)
        .all()
    )
    email_group = groupby(attendees, lambda a: a.email)

    buyer_email = order.user.email
    event_date = convert_to_user_locale(
        buyer_email,
        date=order.event.starts_at,
    )
    event_end_date = convert_to_user_locale(
        buyer_email,
        date_time=order.event.ends_at,
        tz=order.event.timezone,
    )
    event_time = convert_to_user_locale(
        buyer_email,
        time=order.event.starts_at,
        tz=order.event.timezone,
    )

    context = dict(
        order=order,
        starts_at=convert_to_user_locale(
            buyer_email,
            date_time=order.event.starts_at,
            tz=order.event.timezone,
        ),
        ends_at=event_end_date,
        event_time=event_time,
        settings=get_settings(),
        order_view_url=order.site_view_link,
    )

    action = MailType.TICKET_PURCHASED
    mail = MAILS[action]
    send_email(
        to=buyer_email,
        action=action,
        subject=mail['subject'].format(
            event_name=event.name,
            invoice_id=order.invoice_number,
            event_date=event_date,
            event_time=event_time,
        ),
        html=render_template(mail['template'], attendees=attendees, **context),
        attachments=attachments,
    )

    action = MailType.TICKET_PURCHASED_ATTENDEE
    mail = MAILS[action]
    for email, attendees_group in email_group:
        if email == buyer_email:
            # Ticket holder is the purchaser
            continue

        # The Ticket holder is not the purchaser
        send_email(
            to=email,
            action=action,
            subject=mail['subject'].format(
                event_name=event.name,
                invoice_id=order.invoice_number,
                event_date=event_date,
                event_time=event_time,
            ),
            html=render_template(
                mail['template'],
                attendees=list(attendees_group),
                **context,
            ),
            attachments=attachments,
        )


def send_order_purchase_organizer_email(order, recipients):

    order_tickets = OrderTicket.query.filter_by(order_id=order.id).all()
    emails = list({organizer.email for organizer in recipients})
    print(emails[0])

    context = dict(
        buyer_email=order.user.email,
        buyer_name=order.user.full_name,
        event_name=order.event.name,
        invoice_id=order.invoice_number,
        frontend_url=get_settings()['frontend_url'],
        site_link=order.event.site_link,
        order_url=order.site_view_link,
        event_date=convert_to_user_locale(
            order.user.email,
            date=order.event.starts_at,
        ),
        event_time=convert_to_user_locale(
            order.user.email,
            time=order.event.starts_at,
            tz=order.event.timezone,
        ),
        timezone=order.event.timezone,
        purchase_time=convert_to_user_locale(
            emails[0], date_time=order.completed_at, tz=order.event.timezone
        ),
        payment_mode=order.payment_mode,
        payment_status=order.status,
        order_amount=convert_to_user_locale(
            emails[0], amount=order.amount, currency=order.event.payment_currency
        ),
        tickets_count=order.tickets_count,
        order_tickets=order_tickets,
        buyer_org=order.company,
        buyer_address=order.address,
        buyer_zipcode=order.zipcode,
        buyer_city=order.city,
        buyer_state=order.state,
        buyer_country=order.country,
        buyer_tax_id=order.tax_business_info,
        app_name=get_settings()['app_name'],
    )
    if emails:
        send_email_with_action(
            emails[0],
            MailType.TICKET_PURCHASED_ORGANIZER,
            'ticket_purchased_organizer',
            bcc=emails[1:],
            **context,
        )


def send_order_cancel_email(order):
    cancel_msg = ''
    if order.cancel_note:
        cancel_msg = "<br/>Message from the organizer: {cancel_note}".format(
            cancel_note=order.cancel_note
        )

    order_url = (
        get_settings()['frontend_url'] + '/orders/' + str(order.identifier) + '/view/'
    )
    event_url = get_settings()['frontend_url'] + '/e/' + order.event.identifier

    action = MailType.TICKET_CANCELLED
    mail = MAILS[action]
    send_email(
        to=order.user.email,
        action=action,
        subject=mail['subject'].format(
            event_name=order.event.name,
            invoice_id=order.invoice_number,
        ),
        html=render_template(
            mail['template'],
            event_name=order.event.name,
            order_url=order_url,
            event_url=event_url,
            cancel_msg=cancel_msg,
            app_name=get_settings()['app_name'],
        ),
    )


def send_password_change_email(user):
    action = MailType.PASSWORD_CHANGE
    mail = MAILS[action]
    send_email(
        to=user.email,
        action=action,
        subject=mail['subject'].format(app_name=get_settings()['app_name']),
        html=render_template(mail['template']),
    )


def send_password_reset_email(user):
    link = make_frontend_url('/reset-password', {'token': user.reset_password})
    action = (
        MailType.PASSWORD_RESET_AND_VERIFY
        if user.was_registered_with_order
        else MailType.PASSWORD_RESET
    )
    mail = MAILS[action]
    send_email(
        to=user.email,
        action=action,
        subject=mail['subject'].format(app_name=get_settings()['app_name']),
        html=render_template(
            mail['template'],
            link=link,
            settings=get_settings(),
            token=user.reset_password,
        ),
    )


def send_user_register_email(user):
    s = get_serializer()
    hash = str(
        base64.b64encode(str(s.dumps([user.email, str_generator()])).encode()),
        'utf-8',
    )
    link = make_frontend_url('/verify', {'token': hash})
    settings = get_settings()
    action = MailType.USER_REGISTER
    mail = MAILS[action]
    send_email(
        to=user.email,
        action=action,
        subject=mail['subject'].format(app_name=settings['app_name']),
        html=render_template(
            mail['template'],
            email=user.email,
            link=link,
            settings=get_settings(),
        ),
    )


def send_email_to_moderator(video_stream_moderator):
    action = MailType.VIDEO_MODERATOR_INVITE
    mail = MAILS[action]
    event = Event.query.get(video_stream_moderator.video_stream._event_id)
    send_email(
        to=video_stream_moderator.email,
        action=action,
        subject=mail['subject'].format(
            video_name=video_stream_moderator.video_stream.name, event_name=event.name
        ),
        html=render_template(
            mail['template'],
            registration_url=make_frontend_url('/register'),
            event_name=event.name,
            video_stream_name=video_stream_moderator.video_stream.name,
            user=video_stream_moderator.user,
            settings=get_settings(),
        ),
    )


def send_email_after_event(email, event_name):
    app_name = get_settings()['app_name']
    action = MailType.AFTER_EVENT
    mail = MAILS[action]

    send_email(
        to=email,
        action=action,
        subject=mail['subject'].format(event_name=event_name, app_name=app_name),
        html=render_template(
            mail['template'],
            app_name=app_name,
            email=email,
            eventname=event_name,
        ),
    )


def send_email_after_event_speaker(email, event_name):
    action = MailType.AFTER_EVENT_SPEAKER
    mail = MAILS[action]

    send_email(
        to=email,
        action=action,
        subject=mail['subject'].format(event_name=event_name),
        html=render_template(
            mail['template'],
            eventname=event_name,
            email=email,
        ),
    )


def convert_to_user_locale(
    email, date_time=None, date=None, time=None, tz=None, amount=None, currency=None
):
    """
    Convert date and time and amount to user selected language
    :return        : datetime/date/time translated to user locale
    """
    if email:
        user = User.query.filter(User.email == email).first()
        user_locale = 'en'

        if user and user.language_prefrence:
            user_locale = user.language_prefrence

        if amount and currency:
            return format_currency(amount, currency, locale=user_locale)

        if date_time and tz:
            return format_datetime(
                date_time, 'full', tzinfo=pytz.timezone(tz), locale=user_locale
            )

        if date:
            return format_date(date, 'd MMMM Y', locale=user_locale)

        if time and tz:
            return format_time(
                time, 'HH:mm (zzzz)', tzinfo=pytz.timezone(tz), locale=user_locale
            )

    return None