sipa/blueprints/usersuite.py

Summary

Maintainability
D
1 day
Test Coverage
"""Blueprint for Usersuite components
"""
from collections import OrderedDict
import logging
from datetime import datetime
from io import BytesIO

from babel.numbers import format_currency
from flask import (
    Blueprint,
    render_template,
    url_for,
    redirect,
    flash,
    abort,
    request,
    current_app,
    send_file,
)
from flask_babel import format_date, gettext
from flask_login import current_user, login_required
from flask_wtf import FlaskForm
from markupsafe import Markup

from sipa.forms import ContactForm, ChangeMACForm, ChangeMailForm, \
    ChangePasswordForm, flash_formerrors, HostingForm, \
    PaymentForm, ActivateNetworkAccessForm, TerminateMembershipForm, \
    TerminateMembershipConfirmForm, ContinueMembershipForm
from sipa.mail import send_usersuite_contact_mail
from sipa.model.fancy_property import ActiveProperty
from sipa.utils import password_changeable, subscribe_to_status_page
from sipa.model.exceptions import (
    PasswordInvalid,
    UserNotFound,
    MacAlreadyExists,
    TerminationNotPossible,
    UnknownError,
    ContinuationNotPossible,
    SubnetFull,
)
from sipa.model.misc import PaymentDetails

logger = logging.getLogger(__name__)

bp_usersuite = Blueprint('usersuite', __name__, url_prefix='/usersuite')


def capability_or_403(active_property, capability):
    prop: ActiveProperty = getattr(current_user, active_property)
    if not getattr(prop.capabilities, capability):
        abort(403)


@bp_usersuite.route("/", methods=['GET', 'POST'])
@login_required
def index():
    """Usersuite landing page with user account information
    and traffic overview.
    """
    info = current_user.finance_information
    last_update = info.last_update if info else None
    last_received_update = info.last_received_update if info else None
    finance_update_string = (
        " ({}: {})".format(gettext("Stand"),
                           format_date(last_update, 'short', rebase=False))
        if last_update
        else ""
    )
    finance_received_string = (
        format_date(last_received_update, "short", rebase=False)
        if last_received_update
        else ""
    )
    descriptions = OrderedDict(
        [
            ("id", [gettext("Nutzer-ID")]),
            ("realname", [gettext("Voller Name")]),
            ("login", [gettext("Accountname")]),
            ("status", [gettext("Mitgliedschaftsstatus")]),
            ("address", [gettext("Aktuelles Zimmer")]),
            ("ips", [gettext("Aktuelle IP-Adresse")]),
            (
                "mac",
                [
                    gettext("Aktuelle MAC-Adresse"),
                    gettext("Die MAC Adresse des per Kabel verbundenen Gerätes"),
                ],
            ),
            ("mail", [gettext("E-Mail-Adresse")]),
            ("mail_confirmed", [gettext("Status deiner E-Mail-Adresse")]),
            ("mail_forwarded", [gettext("E-Mail-Weiterleitung")]),
            ("wifi_password", [gettext("WLAN Passwort")]),
            # ('hostname', gettext("Hostname")),
            # ('hostalias', gettext("Hostalias")),
            ("userdb_status", [gettext("MySQL Datenbank")]),
            (
                "finance_balance",
                [
                    gettext("Kontostand") + finance_update_string,
                    gettext("Eingegangene Zahlung") + ": " + finance_received_string,
                ],
            ),
        ]
    )

    rows = list(current_user.generate_rows(descriptions))
    payment_form = PaymentForm()
    if payment_form.validate_on_submit():
        months = payment_form.months.data
    else:
        months = payment_form.months.default
        flash_formerrors(payment_form)

    datasource = current_user.datasource
    context = dict(rows=rows,
                   webmailer_url=datasource.webmailer_url,
                   terminate_membership_url=url_for('.terminate_membership'),
                   continue_membership_url=url_for('.continue_membership'),
                   payment_details=render_payment_details(current_user.payment_details(),
                                                          months),
                   girocode=generate_epc_qr_code(current_user.payment_details(), months))

    if current_user.has_connection:
        context.update(
            show_traffic_data=True,
            traffic_user=current_user,
        )

    if info and info.has_to_pay:
        context.update(
            show_transaction_log=True,
            last_update=info.last_update,
            balance=info.balance.raw_value,
            logs=info.history,
        )

    return render_template("usersuite/index.html", payment_form=payment_form, **context)


@bp_usersuite.route("/contact", methods=['GET', 'POST'])
@login_required
def contact():
    """Contact form for logged in users.
    Currently sends an e-mail to the support e-mail address
    '[Usersuite] Category: Subject' with userid and message.
    """
    form = ContactForm()

    if form.validate_on_submit():
        types = {
            'stoerung': "Störung",
            'finanzen': "Finanzen",
            'eigene-technik': "Eigene Technik"
        }

        success = send_usersuite_contact_mail(
            author=form.email.data,
            category=types.get(form.type.data, "Allgemein"),
            subject=form.subject.data,
            message=form.message.data
        )

        if success:
            flash(gettext("Nachricht wurde versandt."), "success")
        else:
            flash(gettext("Es gab einen Fehler beim Versenden der Nachricht. "
                          "Bitte schicke uns direkt eine E-Mail an {}")
                          .format(current_user.datasource.support_mail),
                  'error')
        return redirect(url_for('.index'))
    elif form.is_submitted():
        flash_formerrors(form)

    form.email.default = current_user.mail.raw_value

    return render_template("usersuite/contact.html",
                           form_args={'form': form,
                                      'reset_button': True,
                                      'cancel_to': url_for('.index')})


@bp_usersuite.route("/subscribe", methods=['GET'])
@login_required
def subscribe():
    """Route to subscribe to statuspage"""

    email = current_user.mail.raw_value
    if email == "":
        email = f"{current_user.login.raw_value}@agdsn.me"

    result = subscribe_to_status_page(
        current_app.config['STATUS_PAGE_API_SUBSCRIBE_ENDPOINT'],
        current_app.config['STATUS_PAGE_API_TOKEN'],
        current_app.config['STATUS_PAGE_REQUEST_TIMEOUT'],
        email,
    )
    if result is None:
        flash(gettext("Es ist ein Fehler aufgetreten!"), "error")
    elif result:
        flash(gettext("Deine E-Mail Adresse ({}) wurde zur Status-Page hinzugefügt. Du bekommst "
                      "eine E-Mail mit weiteren Details.").format(email), "success")
    else:
        flash(gettext("Du hast die Statuspage bereits abonniert."), "warning")

    return redirect(url_for('.index'))


def render_payment_details(details: PaymentDetails, months):
    return {
        gettext("Zahlungsempfänger"): details.recipient,
        gettext("Bank"): details.bank,
        gettext("IBAN"): details.iban,
        gettext("BIC"): details.bic,
        gettext("Verwendungszweck"): details.purpose,
        gettext("Betrag"): format_currency(months * current_app.config['MEMBERSHIP_CONTRIBUTION'] / 100, 'EUR',
                                           locale='de_DE')
    }


def generate_epc_qr_code(details: PaymentDetails, months):
    # generate content for epc-qr-code (also known as giro-code)
    EPC_FORMAT = \
        "BCD\n001\n1\nSCT\n{bic}\n{recipient}\n{iban}\nEUR{amount}\n\n\n{purpose}\n\n"

    return EPC_FORMAT.format(
        bic=details.bic.replace(' ', ''),
        recipient=details.recipient,
        iban=details.iban.replace(' ', ''),
        amount=months * current_app.config['MEMBERSHIP_CONTRIBUTION'] / 100,
        purpose=details.purpose)


def get_attribute_endpoint(attribute, capability='edit'):
    """Try to determine the flask endpoint for the according property."""
    if capability == 'edit':
        attribute_mappings = {
            'mac': 'change_mac' if current_user.network_access_active.raw_value else 'activate_network_access',
            'userdb_status': 'hosting',
            'mail': 'change_mail',
            'mail_forwarded': 'change_mail',
            'mail_confirmed': 'resend_confirm_mail',
            'wifi_password': 'reset_wifi_password',
            'finance_balance': 'finance_logs',
        }

        assert attribute in attribute_mappings.keys(), \
            f"No edit endpoint for attribute `{attribute}`"
    else:
        assert capability == 'delete', "capability must be 'delete' or 'edit'"

        attribute_mappings = {
            'userdb_status': 'hosting',
        }

        assert attribute in attribute_mappings.keys(), \
            f"No delete endpoint for attribute `{attribute}`"

    return f"{bp_usersuite.name}.{attribute_mappings[attribute]}"


@bp_usersuite.route("/change-password", methods=['GET', 'POST'])
@login_required
@password_changeable(current_user)
def change_password():
    """Frontend page to change the user's password"""
    form = ChangePasswordForm()

    if form.validate_on_submit():
        old = form.old.data
        new = form.new.data

        try:
            current_user.change_password(old, new)
        except PasswordInvalid:
            flash(gettext("Altes Passwort war inkorrekt!"), "error")
        else:
            flash(gettext("Passwort wurde geändert"), "success")
            return redirect(url_for('.index'))
    elif form.is_submitted():
        flash_formerrors(form)

    return render_template("generic_form.html", page_title=gettext("Passwort ändern"),
                           form_args={'form': form, 'reset_button': True, 'cancel_to': url_for('.index')})


@bp_usersuite.route("/change-mail", methods=['GET', 'POST'])
@login_required
def change_mail():
    """Frontend page to change the user's mail address"""

    capability_or_403('mail', 'edit')

    form = ChangeMailForm()

    if form.validate_on_submit():
        password = form.password.data
        email = form.email.data

        try:
            current_user.change_mail(
                password=password,
                new_mail=email,
                mail_forwarded=form.forwarded.data,
            )
        except UserNotFound:
            flash(gettext("Nutzer nicht gefunden!"), "error")
        except PasswordInvalid:
            flash(gettext("Passwort war inkorrekt!"), "error")
        else:
            flash(gettext("E-Mail-Adresse wurde geändert"), "success")
            return redirect(url_for('.index'))
    elif form.is_submitted():
        flash_formerrors(form)
    else:
        form.email.data = current_user.mail.raw_value
        form.forwarded.data = current_user.mail_forwarded.raw_value

    return render_template('generic_form.html',
                           page_title=gettext("E-Mail-Adresse ändern"),
                           form_args={'form': form, 'cancel_to': url_for('.index')})


@bp_usersuite.route("/resend-confirm-mail", methods=['GET', 'POST'])
@login_required
def resend_confirm_mail():
    """Frontend page to resend confirmation mail"""

    capability_or_403('mail', 'edit')

    form = FlaskForm()

    if form.validate_on_submit():
        if current_user.resend_confirm_mail():
            logger.info('Successfully resent confirmation mail',
                        extra={'tags': {'rate_critical': True}})
            flash(gettext('Wir haben dir eine E-Mail mit einem Bestätigungslink geschickt.'), 'success')
        else:
            flash(gettext('Versenden der Bestätigungs-E-Mail ist fehlgeschlagen!'), 'error')

        return redirect(url_for('.index'))
    elif form.is_submitted():
        flash_formerrors(form)

    form_args = {
        'form': form,
        'cancel_to': url_for('.index'),
        'submit_text': gettext('E-Mail mit Bestätigungslink erneut senden')
    }

    return render_template('generic_form.html',
                           page_title=gettext("Bestätigung deiner E-Mail-Adresse"),
                           form_args=form_args)


@bp_usersuite.route("/change-mac", methods=['GET', 'POST'])
@login_required
def change_mac():
    """As user, change the MAC address of your device.
    """

    capability_or_403('mac', 'edit')

    form = ChangeMACForm()

    if form.validate_on_submit():
        password = form.password.data
        mac = form.mac.data
        host_name = form.host_name.data

        try:
            current_user.change_mac_address(mac, host_name, password)
        except PasswordInvalid:
            flash(gettext("Passwort war inkorrekt!"), "error")
        except MacAlreadyExists:
            flash(gettext("MAC-Adresse ist bereits in Verwendung!"), "error")
        else:
            logger.info('Successfully changed MAC address',
                        extra={'data': {'mac': mac},
                               'tags': {'rate_critical': True}})

            flash(gettext("MAC-Adresse wurde geändert!"), 'success')
            flash(gettext("Es kann bis zu 15 Minuten dauern, "
                          "bis die Änderung wirksam ist."), 'info')

            return redirect(url_for('.index'))

    elif form.is_submitted():
        flash_formerrors(form)

    form.mac.default = current_user.mac.value

    return render_template('usersuite/change_mac.html',
                           form_args={'form': form, 'cancel_to': url_for('.index')})


@bp_usersuite.route("/activate-network-access", methods=['GET', 'POST'])
@login_required
def activate_network_access():
    """As user, activate your network access
    """

    capability_or_403('network_access_active', 'edit')

    form = ActivateNetworkAccessForm(birthdate=current_user.birthdate.raw_value)

    if form.validate_on_submit():
        password = form.password.data
        mac = form.mac.data
        birthdate = form.birthdate.data
        host_name = form.host_name.data

        try:
            current_user.activate_network_access(password, mac, birthdate, host_name)
        except PasswordInvalid:
            flash(gettext("Passwort war inkorrekt!"), "error")
        except MacAlreadyExists:
            flash(gettext("MAC-Adresse ist bereits in Verwendung!"), "error")
        except SubnetFull:
            flash(gettext("Es sind nicht mehr genug freie IPv4 Adressen verfügbar. Bitte kontaktiere den Support."),  "error")
        else:
            logger.info('Successfully activated network access',
                        extra={'data': {'mac': mac, 'birthdate': birthdate, 'host_name': host_name},
                               'tags': {'rate_critical': True}})

            flash(gettext("Netzwerkzugang wurde aktiviert!"), 'success')
            flash(gettext("Es kann bis zu 10 Minuten dauern, "
                          "bis der Netzwerkzugang funktioniert."), 'info')

            return redirect(url_for('.index'))

    elif form.is_submitted():
        flash_formerrors(form)

    return render_template('generic_form.html', page_title=gettext("Netzwerkanschluss aktivieren"),
                           form_args={'form': form, 'cancel_to': url_for('.index')})


@bp_usersuite.route("/hosting", methods=['GET', 'POST'])
@bp_usersuite.route("/hosting/<string:action>", methods=['GET', 'POST'])
@login_required
def hosting(action=None):
    """Change various settings for Helios.
    """
    if not current_user.has_property("userdb"):
        abort(403)

    if action == "confirm":
        current_user.userdb.drop()
        flash(gettext("Deine Datenbank wurde gelöscht."), 'success')
        return redirect(url_for('.hosting'))

    form = HostingForm()

    if form.validate_on_submit():
        if form.action.data == "create":
            current_user.userdb.create(form.password.data)
            flash(gettext("Deine Datenbank wurde erstellt."), 'success')
        else:
            current_user.userdb.change_password(form.password.data)
    elif form.is_submitted():
        flash_formerrors(form)

    try:
        user_has_db = current_user.userdb.has_db
    except NotImplementedError:
        abort(403)

    return render_template('usersuite/hosting.html',
                           form=form, user_has_db=user_has_db, action=action)


@bp_usersuite.route("/finance-logs")
@login_required
def finance_logs():
    return redirect(url_for('usersuite.index', _anchor='transaction-log'))


@bp_usersuite.route("/terminate-membership", methods=['GET', 'POST'])
@login_required
def terminate_membership():
    """
    As member, cancel your membership to a given date
    :return:
    """

    capability_or_403('membership_end_date', 'edit')

    if current_user.membership_end_date.raw_value is not None:
        abort(403)

    form = TerminateMembershipForm()

    if form.validate_on_submit():
        end_date = form.end_date.data

        return redirect(url_for('.terminate_membership_confirm',
                                end_date=end_date))
    elif form.is_submitted():
        flash_formerrors(form)

    form_args = {
        'form': form,
        'cancel_to': url_for('.index'),
        'submit_text': gettext('Weiter')
    }

    return render_template('generic_form.html',
                           page_title=gettext("Mitgliedschaft beenden"),
                           form_args=form_args)


@bp_usersuite.route("/terminate-membership/confirm", methods=['GET', 'POST'])
@login_required
def terminate_membership_confirm():
    """
    As member, cancel your membership to a given date
    :return:
    """

    capability_or_403('membership_end_date', 'edit')

    if current_user.membership_end_date.raw_value is not None:
        abort(403)

    end_date = request.args.get("end_date", None, lambda x: datetime.strptime(x, '%Y-%m-%d').date())

    form = TerminateMembershipConfirmForm()

    if end_date is not None:
        try:
            form.estimated_balance.data = str(current_user.estimate_balance(
                end_date))

        except UnknownError:
            flash(gettext("Unbekannter Fehler!"), "error")
        else:
            form.end_date.data = end_date
    else:
        return redirect(url_for('.terminate_membership'))

    if form.validate_on_submit():
        try:
            current_user.terminate_membership(form.end_date.data)
        except TerminationNotPossible:
            flash(gettext("Beendigung der Mitgliedschaft nicht möglich!"), "error")
        except MacAlreadyExists:
            flash(gettext("Unbekannter Fehler!"), "error")
        else:
            logger.info('Successfully scheduled membership termination',
                        extra={'data': {'end_date': form.end_date.data},
                               'tags': {'rate_critical': True}})

            flash(gettext("Deine Mitgliedschaft wird zum angegebenen Datum beendet."), 'success')

        return redirect(url_for('.index'))
    elif form.is_submitted():
        flash_formerrors(form)

    form_args = {
        'form': form,
        'cancel_to': url_for('.terminate_membership')
    }

    return render_template('generic_form.html',
                           page_title=gettext("Mitgliedschaft beenden - Bestätigen"),
                           form_args=form_args)


@bp_usersuite.route("/continue-membership", methods=['GET', 'POST'])
@login_required
def continue_membership():
    """
    Cancel termination of membership
    :return:
    """

    capability_or_403('membership_end_date', 'edit')

    if current_user.membership_end_date.raw_value is None:
        abort(403)

    form = ContinueMembershipForm()

    if form.validate_on_submit():
        try:
            current_user.continue_membership()
        except ContinuationNotPossible:
            flash(gettext("Fortsetzung der Mitgliedschaft nicht möglich!"), "error")
        except UnknownError:
            flash(gettext("Unbekannter Fehler!"), "error")
        else:
            logger.info('Successfully cancelled membership termination',
                        extra={'tags': {'rate_critical': True}})

            flash(gettext("Deine Mitgliedschaft wird fortgesetzt."), 'success')

        return redirect(url_for('.index'))
    elif form.is_submitted():
        flash_formerrors(form)

    form_args = {
        'form': form,
        'cancel_to': url_for('.index')
    }

    return render_template('generic_form.html',
                           page_title=gettext("Mitgliedschaft fortsetzen"),
                           form_args=form_args)


@bp_usersuite.route("/reset-wifi-password", methods=['GET', 'POST'])
@login_required
def reset_wifi_password():
    """
    Reset the wifi password
    """

    form = FlaskForm()

    capability_or_403('wifi_password', 'edit')

    if form.validate_on_submit():
        try:
            new_password = current_user.reset_wifi_password()
        except UnknownError:
            flash(gettext("Unbekannter Fehler!"), "error")
        else:
            logger.info('Successfully reset wifi password',
                        extra={'tags': {'rate_critical': True}})

            flash(Markup("{}:<pre>{}</pre>".format(gettext("Es wurde ein neues WLAN Passwort generiert"), new_password)), 'success')

        return redirect(url_for('.index'))
    elif form.is_submitted():
        flash_formerrors(form)

    form_args = {
        'form': form,
        'cancel_to': url_for('.index'),
        'submit_text': gettext('Neues WLAN Passwort generieren')
    }

    return render_template('generic_form.html',
                           page_title=gettext("Neues WLAN Passwort"),
                           form_args=form_args)


@bp_usersuite.route("/get-apple-wlan-mobileconfig", methods=["GET"])
@login_required
def get_apple_wlan_mobileconfig():
    """
    Get the mobileconfig for the agdsn WLAN for an Apple device.
    """

    login = current_user.login.raw_value
    wifi_password = current_user.wifi_password.raw_value

    if not wifi_password:
        abort(404)

    return send_file(
        BytesIO(
            bytes(
                render_template(
                    "apple-mobileconfig.xml.j2",
                    login=login,
                    wifi_password=wifi_password,
                ),
                encoding="utf-8",
            )
        ),
        as_attachment=True,
        download_name="agdsn.mobileconfig",
    )